123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214 |
- """
- Tests for the file state
- """
- import errno
- import filecmp
- import logging
- import os
- import pathlib
- import re
- import shutil
- import stat
- import sys
- import tempfile
- import textwrap
- import pytest
- import salt.serializers.configparser
- import salt.serializers.plist
- import salt.utils.data
- import salt.utils.files
- import salt.utils.json
- import salt.utils.path
- import salt.utils.platform
- import salt.utils.stringutils
- from salt.ext import six
- from salt.ext.six.moves import range
- from salt.utils.versions import LooseVersion as _LooseVersion
- from tests.support.case import ModuleCase
- from tests.support.helpers import (
- Webserver,
- dedent,
- destructiveTest,
- requires_system_grains,
- skip_if_not_root,
- with_system_user_and_group,
- with_tempdir,
- with_tempfile,
- )
- from tests.support.mixins import SaltReturnAssertsMixin
- from tests.support.runtests import RUNTIME_VARS
- from tests.support.unit import skipIf
- log = logging.getLogger(__name__)
- HAS_PWD = True
- try:
- import pwd
- except ImportError:
- HAS_PWD = False
- HAS_GRP = True
- try:
- import grp
- except ImportError:
- HAS_GRP = False
- IS_WINDOWS = salt.utils.platform.is_windows()
- BINARY_FILE = b"GIF89a\x01\x00\x01\x00\x80\x00\x00\x05\x04\x04\x00\x00\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02D\x01\x00;"
- TEST_SYSTEM_USER = "test_system_user"
- TEST_SYSTEM_GROUP = "test_system_group"
- def _test_managed_file_mode_keep_helper(testcase, local=False):
- """
- DRY helper function to run the same test with a local or remote path
- """
- name = testcase.tmp_dir / "scene33"
- testcase.addCleanup(salt.utils.files.safe_rm, str(name))
- grail_fs_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
- grail = "salt://grail/scene33" if not local else grail_fs_path
- # Get the current mode so that we can put the file back the way we
- # found it when we're done.
- grail_fs_mode = int(testcase.run_function("file.get_mode", [grail_fs_path]), 8)
- initial_mode = 0o770
- new_mode_1 = 0o600
- new_mode_2 = 0o644
- # Set the initial mode, so we can be assured that when we set the mode
- # to "keep", we're actually changing the permissions of the file to the
- # new mode.
- ret = testcase.run_state(
- "file.managed", name=str(name), mode=oct(initial_mode), source=grail,
- )
- if IS_WINDOWS:
- testcase.assertSaltFalseReturn(ret)
- return
- testcase.assertSaltTrueReturn(ret)
- try:
- # Update the mode on the fileserver (pass 1)
- os.chmod(grail_fs_path, new_mode_1)
- ret = testcase.run_state(
- "file.managed", name=str(name), mode="keep", source=grail,
- )
- testcase.assertSaltTrueReturn(ret)
- managed_mode = stat.S_IMODE(name.stat().st_mode)
- testcase.assertEqual(oct(managed_mode), oct(new_mode_1))
- # Update the mode on the fileserver (pass 2)
- # This assures us that if the file in file_roots was originally set
- # to the same mode as new_mode_1, we definitely get an updated mode
- # this time.
- os.chmod(grail_fs_path, new_mode_2)
- ret = testcase.run_state(
- "file.managed", name=str(name), mode="keep", source=grail,
- )
- testcase.assertSaltTrueReturn(ret)
- managed_mode = stat.S_IMODE(name.stat().st_mode)
- testcase.assertEqual(oct(managed_mode), oct(new_mode_2))
- finally:
- # Set the mode of the file in the file_roots back to what it
- # originally was.
- os.chmod(grail_fs_path, grail_fs_mode)
- @pytest.mark.windows_whitelisted
- class FileTest(ModuleCase, SaltReturnAssertsMixin):
- """
- Validate the file state
- """
- @classmethod
- def setUpClass(cls):
- cls.tmp_dir = pathlib.Path(tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)).resolve()
- @classmethod
- def tearDownClass(cls):
- salt.utils.files.rm_rf(str(cls.tmp_dir))
- def _delete_file(self, path):
- try:
- os.remove(path)
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- log.error("Failed to remove %s: %s", path, exc)
- def tearDown(self):
- """
- remove files created in previous tests
- """
- user = "salt"
- if user in str(self.run_function("user.list_users")):
- self.run_function("user.delete", [user])
- def test_symlink(self):
- """
- file.symlink
- """
- name = self.tmp_dir / "symlink"
- tgt = self.tmp_dir / "target"
- # Windows must have a source directory to link to
- if IS_WINDOWS and not tgt.is_dir():
- tgt.mkdir()
- # Windows cannot create a symlink if it already exists
- if IS_WINDOWS and name.is_symlink():
- name.unlink()
- ret = self.run_state("file.symlink", name=str(name), target=str(tgt))
- self.assertSaltTrueReturn(ret)
- def test_test_symlink(self):
- """
- file.symlink test interface
- """
- name = self.tmp_dir / "symlink2"
- tgt = self.tmp_dir / "target2"
- ret = self.run_state("file.symlink", test=True, name=str(name), target=str(tgt))
- self.assertSaltNoneReturn(ret)
- def test_absent_file(self):
- """
- file.absent
- """
- name = self.tmp_dir / "file_to_kill"
- name.write_text("killme")
- ret = self.run_state("file.absent", name=str(name))
- self.assertSaltTrueReturn(ret)
- self.assertFalse(name.is_file())
- def test_absent_dir(self):
- """
- file.absent
- """
- name = self.tmp_dir / "dir_to_kill"
- name.mkdir(exist_ok=True)
- ret = self.run_state("file.absent", name=str(name))
- self.assertSaltTrueReturn(ret)
- self.assertFalse(name.is_dir())
- def test_absent_link(self):
- """
- file.absent
- """
- name = self.tmp_dir / "link_to_kill"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- tgt = self.tmp_dir / "link_to_kill.tgt"
- self.addCleanup(salt.utils.files.safe_rm, str(tgt))
- tgt.symlink_to(name, target_is_directory=IS_WINDOWS)
- ret = self.run_state("file.absent", name=str(name))
- self.assertSaltTrueReturn(ret)
- self.assertFalse(name.exists())
- self.assertFalse(name.is_symlink())
- @with_tempfile()
- def test_test_absent(self, name):
- """
- file.absent test interface
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("killme")
- ret = self.run_state("file.absent", test=True, name=name)
- self.assertSaltNoneReturn(ret)
- self.assertTrue(os.path.isfile(name))
- def test_managed(self):
- """
- file.managed
- """
- name = self.tmp_dir / "grail_scene33"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- ret = self.run_state(
- "file.managed", name=str(name), source="salt://grail/scene33"
- )
- src = pathlib.Path(RUNTIME_VARS.BASE_FILES) / "grail" / "scene33"
- master_data = src.read_text()
- minion_data = name.read_text()
- self.assertEqual(master_data, minion_data)
- self.assertSaltTrueReturn(ret)
- def test_managed_file_mode(self):
- """
- file.managed, correct file permissions
- """
- desired_mode = 504 # 0770 octal
- name = self.tmp_dir / "grail_scene33"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- ret = self.run_state(
- "file.managed", name=str(name), mode="0770", source="salt://grail/scene33"
- )
- if IS_WINDOWS:
- expected = "The 'mode' option is not supported on Windows"
- self.assertEqual(ret[list(ret)[0]]["comment"], expected)
- self.assertSaltFalseReturn(ret)
- return
- resulting_mode = stat.S_IMODE(name.stat().st_mode)
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
- def test_managed_file_mode_keep(self):
- """
- Test using "mode: keep" in a file.managed state
- """
- _test_managed_file_mode_keep_helper(self, local=False)
- @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
- def test_managed_file_mode_keep_local_source(self):
- """
- Test using "mode: keep" in a file.managed state, with a local file path
- as the source.
- """
- _test_managed_file_mode_keep_helper(self, local=True)
- def test_managed_file_mode_file_exists_replace(self):
- """
- file.managed, existing file with replace=True, change permissions
- """
- initial_mode = 504 # 0770 octal
- desired_mode = 384 # 0600 octal
- name = self.tmp_dir / "grail_scene33"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- ret = self.run_state(
- "file.managed",
- name=str(name),
- mode=oct(initial_mode),
- source="salt://grail/scene33",
- )
- if IS_WINDOWS:
- expected = "The 'mode' option is not supported on Windows"
- self.assertEqual(ret[list(ret)[0]]["comment"], expected)
- self.assertSaltFalseReturn(ret)
- return
- resulting_mode = stat.S_IMODE(name.stat().st_mode)
- self.assertEqual(oct(initial_mode), oct(resulting_mode))
- ret = self.run_state(
- "file.managed",
- name=str(name),
- replace=True,
- mode=oct(desired_mode),
- source="salt://grail/scene33",
- )
- resulting_mode = stat.S_IMODE(name.stat().st_mode)
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- def test_managed_file_mode_file_exists_noreplace(self):
- """
- file.managed, existing file with replace=False, change permissions
- """
- initial_mode = 504 # 0770 octal
- desired_mode = 384 # 0600 octal
- name = self.tmp_dir / "grail_scene33"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- ret = self.run_state(
- "file.managed",
- name=str(name),
- replace=True,
- mode=oct(initial_mode),
- source="salt://grail/scene33",
- )
- if IS_WINDOWS:
- expected = "The 'mode' option is not supported on Windows"
- self.assertEqual(ret[list(ret)[0]]["comment"], expected)
- self.assertSaltFalseReturn(ret)
- return
- ret = self.run_state(
- "file.managed",
- name=str(name),
- replace=False,
- mode=oct(desired_mode),
- source="salt://grail/scene33",
- )
- resulting_mode = stat.S_IMODE(name.stat().st_mode)
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- def test_managed_file_with_grains_data(self):
- """
- Test to ensure we can render grains data into a managed
- file.
- """
- grain_path = self.tmp_dir / "file-grain-test"
- self.addCleanup(salt.utils.files.safe_rm, str(grain_path))
- state_file = "file-grainget"
- self.run_function(
- "state.sls", [state_file], pillar={"grain_path": str(grain_path)}
- )
- self.assertTrue(grain_path.exists())
- file_contents = grain_path.read_text().splitlines(True)
- match = "^minion\n"
- self.assertTrue(re.match(match, file_contents[0]))
- def test_managed_file_with_pillardefault_sls(self):
- """
- Test to ensure when pillar data is not available
- in sls file with pillar.get it uses the default
- value.
- """
- file_pillar_def = os.path.join(RUNTIME_VARS.TMP, "filepillar-defaultvalue")
- self.addCleanup(self._delete_file, file_pillar_def)
- state_name = "file-pillardefaultget"
- log.warning("File Path: %s", file_pillar_def)
- ret = self.run_function("state.sls", [state_name])
- self.assertSaltTrueReturn(ret)
- # Check to make sure the file was created
- check_file = self.run_function("file.file_exists", [file_pillar_def])
- self.assertTrue(check_file)
- @skip_if_not_root
- def test_managed_dir_mode(self):
- """
- Tests to ensure that file.managed creates directories with the
- permissions requested with the dir_mode argument
- """
- desired_mode = 511 # 0777 in octal
- name = self.tmp_dir / "a" / "managed_dir_mode_test_file"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- desired_owner = "nobody"
- ret = self.run_state(
- "file.managed",
- name=str(name),
- source="salt://grail/scene33",
- mode=600,
- makedirs=True,
- user=desired_owner,
- dir_mode=oct(desired_mode), # 0777
- )
- if IS_WINDOWS:
- expected = "The 'mode' option is not supported on Windows"
- self.assertEqual(ret[list(ret)[0]]["comment"], expected)
- self.assertSaltFalseReturn(ret)
- return
- resulting_mode = stat.S_IMODE(name.parent.stat().st_mode)
- resulting_owner = pwd.getpwuid(name.parent.stat().st_uid).pw_name
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- self.assertEqual(desired_owner, resulting_owner)
- def test_test_managed(self):
- """
- file.managed test interface
- """
- name = self.tmp_dir / "grail_not_not_scene33"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- ret = self.run_state(
- "file.managed", test=True, name=str(name), source="salt://grail/scene33"
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(name.is_file())
- def test_managed_show_changes_false(self):
- """
- file.managed test interface
- """
- name = self.tmp_dir / "grail_not_scene33"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- name.write_bytes(b"test_managed_show_changes_false\n")
- ret = self.run_state(
- "file.managed",
- name=str(name),
- source="salt://grail/scene33",
- show_changes=False,
- )
- changes = next(iter(ret.values()))["changes"]
- self.assertEqual("<show_changes=False>", changes["diff"])
- def test_managed_show_changes_true(self):
- """
- file.managed test interface
- """
- name = self.tmp_dir / "grail_not_scene33"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- name.write_bytes(b"test_managed_show_changes_false\n")
- ret = self.run_state(
- "file.managed", name=str(name), source="salt://grail/scene33",
- )
- changes = next(iter(ret.values()))["changes"]
- self.assertIn("diff", changes)
- @skipIf(IS_WINDOWS, "Don't know how to fix for Windows")
- def test_managed_escaped_file_path(self):
- """
- file.managed test that 'salt://|' protects unusual characters in file path
- """
- funny_file = salt.utils.files.mkstemp(
- prefix="?f!le? n@=3&", suffix=".file type"
- )
- funny_file_name = os.path.split(funny_file)[1]
- funny_url = "salt://|" + funny_file_name
- funny_url_path = os.path.join(RUNTIME_VARS.BASE_FILES, funny_file_name)
- state_name = "funny_file"
- state_file_name = state_name + ".sls"
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_file_name)
- state_key = "file_|-{0}_|-{0}_|-managed".format(funny_file)
- self.addCleanup(os.remove, state_file)
- self.addCleanup(os.remove, funny_file)
- self.addCleanup(os.remove, funny_url_path)
- with salt.utils.files.fopen(funny_url_path, "w"):
- pass
- with salt.utils.files.fopen(state_file, "w") as fp_:
- fp_.write(
- textwrap.dedent(
- """\
- {}:
- file.managed:
- - source: {}
- - makedirs: True
- """.format(
- funny_file, funny_url
- )
- )
- )
- ret = self.run_function("state.sls", [state_name])
- self.assertTrue(ret[state_key]["result"])
- def test_managed_contents(self):
- """
- test file.managed with contents that is a boolean, string, integer,
- float, list, and dictionary
- """
- state_name = "file-FileTest-test_managed_contents"
- state_filename = state_name + ".sls"
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- managed_files = {}
- state_keys = {}
- for typ in ("bool", "str", "int", "float", "list", "dict"):
- managed_files[typ] = salt.utils.files.mkstemp()
- state_keys[typ] = "file_|-{} file_|-{}_|-managed".format(
- typ, managed_files[typ]
- )
- try:
- with salt.utils.files.fopen(state_file, "w") as fd_:
- fd_.write(
- textwrap.dedent(
- """\
- bool file:
- file.managed:
- - name: {bool}
- - contents: True
- str file:
- file.managed:
- - name: {str}
- - contents: Salt was here.
- int file:
- file.managed:
- - name: {int}
- - contents: 340282366920938463463374607431768211456
- float file:
- file.managed:
- - name: {float}
- - contents: 1.7518e-45 # gravitational coupling constant
- list file:
- file.managed:
- - name: {list}
- - contents: [1, 1, 2, 3, 5, 8, 13]
- dict file:
- file.managed:
- - name: {dict}
- - contents:
- C: charge
- P: parity
- T: time
- """.format(
- **managed_files
- )
- )
- )
- ret = self.run_function("state.sls", [state_name])
- self.assertSaltTrueReturn(ret)
- for typ in state_keys:
- self.assertTrue(ret[state_keys[typ]]["result"])
- self.assertIn("diff", ret[state_keys[typ]]["changes"])
- finally:
- if os.path.exists(state_file):
- os.remove(state_file)
- for typ in managed_files:
- if os.path.exists(managed_files[typ]):
- os.remove(managed_files[typ])
- def test_onchanges_any_recursive_error_issues_50811(self):
- """
- test that onchanges_any does not causes a recursive error
- """
- state_name = "onchanges_any_recursive_error"
- state_filename = state_name + ".sls"
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- try:
- with salt.utils.files.fopen(state_file, "w") as fd_:
- fd_.write(
- textwrap.dedent(
- """\
- command-test:
- cmd.run:
- - name: ls
- - onchanges_any:
- - file: /tmp/an-unfollowed-file
- """
- )
- )
- ret = self.run_function("state.sls", [state_name])
- self.assertSaltFalseReturn(ret)
- finally:
- if os.path.exists(state_file):
- os.remove(state_file)
- def test_prerequired_issues_55775(self):
- """
- Test that __prereqired__ is filter from file.replace
- if __prereqired__ is not filter from file.replace an error will be raised
- """
- state_name = "Test_Issues_55775"
- state_filename = state_name + ".sls"
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- test_file = os.path.join(RUNTIME_VARS.BASE_FILES, "Issues_55775.txt")
- try:
- with salt.utils.files.fopen(state_file, "w") as fd_:
- fd_.write(
- textwrap.dedent(
- """\
- /tmp/bug.txt:
- file.managed:
- - name: {0}
- - contents:
- - foo
- file.replace:
- file.replace:
- - name: {0}
- - pattern: 'foo'
- - repl: 'bar'
- - prereq:
- - test no changes
- - test changes
- test no changes:
- test.succeed_without_changes:
- - name: no changes
- test changes:
- test.succeed_with_changes:
- - name: changes
- - require:
- - test: test no changes
- """.format(
- test_file
- )
- )
- )
- ret = self.run_function("state.sls", [state_name])
- self.assertSaltTrueReturn(ret)
- finally:
- for fpath in (state_file, test_file):
- if os.path.exists(fpath):
- os.remove(fpath)
- def test_managed_contents_with_contents_newline(self):
- """
- test file.managed with contents by using the default content_newline
- flag.
- """
- contents = "test_managed_contents_with_newline_one"
- name = self.tmp_dir / "foo"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- # Create a file named foo with contents as above but with a \n at EOF
- self.run_state(
- "file.managed", name=str(name), contents=contents, contents_newline=True
- )
- last_line = name.read_text()
- self.assertEqual((contents + "\n"), last_line)
- def test_managed_contents_with_contents_newline_false(self):
- """
- test file.managed with contents by using the non default content_newline
- flag.
- """
- contents = "test_managed_contents_with_newline_one"
- name = self.tmp_dir / "bar"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- # Create a file named foo with contents as above but with a \n at EOF
- self.run_state(
- "file.managed", name=str(name), contents=contents, contents_newline=False
- )
- last_line = name.read_text()
- self.assertEqual(contents, last_line)
- def test_managed_multiline_contents_with_contents_newline(self):
- """
- test file.managed with contents by using the non default content_newline
- flag.
- """
- contents = "this is a cookie\nthis is another cookie"
- name = self.tmp_dir / "bar"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- # Create a file named foo with contents as above but with a \n at EOF
- self.run_state(
- "file.managed", name=str(name), contents=contents, contents_newline=True
- )
- last_line = name.read_text()
- self.assertEqual((contents + "\n"), last_line)
- def test_managed_multiline_contents_with_contents_newline_false(self):
- """
- test file.managed with contents by using the non default content_newline
- flag.
- """
- contents = "this is a cookie\nthis is another cookie"
- name = self.tmp_dir / "bar"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- # Create a file named foo with contents as above but with a \n at EOF
- self.run_state(
- "file.managed", name=str(name), contents=contents, contents_newline=False
- )
- last_line = name.read_text()
- self.assertEqual(contents, last_line)
- @skip_if_not_root
- @skipIf(IS_WINDOWS, 'Windows does not support "mode" kwarg. Skipping.')
- @skipIf(not salt.utils.path.which("visudo"), "sudo is missing")
- def test_managed_check_cmd(self):
- """
- Test file.managed passing a basic check_cmd kwarg. See Issue #38111.
- """
- r_group = "root"
- if salt.utils.platform.is_darwin() or salt.utils.platform.is_freebsd():
- r_group = "wheel"
- name = self.tmp_dir / "sudoers"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- ret = self.run_state(
- "file.managed",
- name=str(name),
- user="root",
- group=r_group,
- mode=440,
- check_cmd="visudo -c -s -f",
- )
- self.assertSaltTrueReturn(ret)
- self.assertInSaltComment("Empty file", ret)
- self.assertEqual(
- ret["file_|-{0}_|-{0}_|-managed".format(name)]["changes"],
- {"new": "file {} created".format(name), "mode": "0440"},
- )
- def test_managed_local_source_with_source_hash(self):
- """
- Make sure that we enforce the source_hash even with local files
- """
- name = self.tmp_dir / "local_source_with_source_hash"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
- actual_hash = "567fd840bf1548edc35c48eb66cdd78bfdfcccff"
- if IS_WINDOWS:
- # CRLF vs LF causes a different hash on windows
- actual_hash = "f658a0ec121d9c17088795afcc6ff3c43cb9842a"
- # Reverse the actual hash
- bad_hash = actual_hash[::-1]
- def remove_file():
- try:
- os.remove(str(name))
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- raise
- def do_test(clean=False):
- for proto in ("file://", ""):
- source = proto + local_path
- log.debug("Trying source %s", source)
- try:
- ret = self.run_state(
- "file.managed",
- name=str(name),
- source=source,
- source_hash="sha1={}".format(bad_hash),
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- # Shouldn't be any changes
- self.assertFalse(ret["changes"])
- # Check that we identified a hash mismatch
- self.assertIn("does not match actual checksum", ret["comment"])
- ret = self.run_state(
- "file.managed",
- name=str(name),
- source=source,
- source_hash="sha1={}".format(actual_hash),
- )
- self.assertSaltTrueReturn(ret)
- finally:
- if clean:
- remove_file()
- remove_file()
- log.debug("Trying with nonexistant destination file")
- do_test()
- log.debug("Trying with destination file already present")
- name.write_text("")
- try:
- do_test(clean=False)
- finally:
- remove_file()
- def test_managed_local_source_does_not_exist(self):
- """
- Make sure that we exit gracefully when a local source doesn't exist
- """
- name = self.tmp_dir / "local_source_does_not_exist"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene99")
- for proto in ("file://", ""):
- source = proto + local_path
- log.debug("Trying source %s", source)
- ret = self.run_state("file.managed", name=str(name), source=source)
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- # Shouldn't be any changes
- self.assertFalse(ret["changes"])
- # Check that we identified a hash mismatch
- self.assertIn("does not exist", ret["comment"])
- def test_managed_unicode_jinja_with_tojson_filter(self):
- """
- Using {{ varname }} with a list or dictionary which contains unicode
- types on Python 2 will result in Jinja rendering the "u" prefix on each
- string. This tests that using the "tojson" jinja filter will dump them
- to a format which can be successfully loaded by our YAML loader.
- The two lines that should end up being rendered are meant to test two
- issues that would trip up PyYAML if the "tojson" filter were not used:
- 1. A unicode string type would be loaded as a unicode literal with the
- leading "u" as well as the quotes, rather than simply being loaded
- as the proper unicode type which matches the content of the string
- literal. In other words, u'foo' would be loaded literally as
- u"u'foo'". This test includes actual non-ascii unicode in one of the
- strings to confirm that this also handles these international
- characters properly.
- 2. Any unicode string type (such as a URL) which contains a colon would
- cause a ScannerError in PyYAML, as it would be assumed to delimit a
- mapping node.
- Dumping the data structure to JSON using the "tojson" jinja filter
- should produce an inline data structure which is valid YAML and will be
- loaded properly by our YAML loader.
- """
- test_file = self.tmp_dir / "test-tojson.txt"
- self.addCleanup(salt.utils.files.safe_rm, str(test_file))
- ret = self.run_function(
- "state.apply", mods="tojson", pillar={"tojson-file": str(test_file)}
- )
- ret = ret[next(iter(ret))]
- assert ret["result"], ret
- managed = salt.utils.stringutils.to_unicode(test_file.read_bytes())
- expected = dedent(
- """\
- Die Webseite ist https://saltstack.com.
- Der Zucker ist süß.
- """
- )
- assert managed == expected, "{!r} != {!r}".format(managed, expected)
- def test_managed_source_hash_indifferent_case(self):
- """
- Test passing a source_hash as an uppercase hash.
- This is a regression test for Issue #38914 and Issue #48230 (test=true use).
- """
- name = self.tmp_dir / "source_hash_indifferent_case"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- state_name = "file_|-{0}_|" "-{0}_|-managed".format(name)
- local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "hello_world.txt")
- actual_hash = "c98c24b677eff44860afea6f493bbaec5bb1c4cbb209c6fc2bbb47f66ff2ad31"
- if IS_WINDOWS:
- # CRLF vs LF causes a differnt hash on windows
- actual_hash = (
- "92b772380a3f8e27a93e57e6deeca6c01da07f5aadce78bb2fbb20de10a66925"
- )
- uppercase_hash = actual_hash.upper()
- # Lay down tmp file to test against
- self.run_state(
- "file.managed", name=str(name), source=local_path, source_hash=actual_hash
- )
- # Test uppercase source_hash: should return True with no changes
- ret = self.run_state(
- "file.managed",
- name=str(name),
- source=local_path,
- source_hash=uppercase_hash,
- )
- assert ret[state_name]["result"] is True
- assert ret[state_name]["changes"] == {}
- # Test uppercase source_hash using test=true
- # Should return True with no changes
- ret = self.run_state(
- "file.managed",
- name=str(name),
- source=local_path,
- source_hash=uppercase_hash,
- test=True,
- )
- assert ret[state_name]["result"] is True
- assert ret[state_name]["changes"] == {}
- @with_tempfile(create=False)
- def test_managed_latin1_diff(self, name):
- """
- Tests that latin-1 file contents are represented properly in the diff
- """
- # Lay down the initial file
- ret = self.run_state(
- "file.managed", name=name, source="salt://issue-48777/old.html"
- )
- ret = ret[next(iter(ret))]
- assert ret["result"] is True, ret
- # Replace it with the new file and check the diff
- ret = self.run_state(
- "file.managed", name=name, source="salt://issue-48777/new.html"
- )
- ret = ret[next(iter(ret))]
- assert ret["result"] is True, ret
- diff_lines = ret["changes"]["diff"].split(os.linesep)
- assert "+räksmörgås" in diff_lines, diff_lines
- @with_tempfile()
- def test_managed_keep_source_false_salt(self, name):
- """
- This test ensures that we properly clean the cached file if keep_source
- is set to False, for source files using a salt:// URL
- """
- source = "salt://grail/scene33"
- saltenv = "base"
- # Run the state
- ret = self.run_state(
- "file.managed", name=name, source=source, saltenv=saltenv, keep_source=False
- )
- ret = ret[next(iter(ret))]
- assert ret["result"] is True
- # Now make sure that the file is not cached
- result = self.run_function("cp.is_cached", [source, saltenv])
- assert result == "", "File is still cached at {}".format(result)
- @with_tempfile(create=False)
- @with_tempfile(create=False)
- def test_file_managed_onchanges(self, file1, file2):
- """
- Test file.managed state with onchanges
- """
- pillar = {
- "file1": file1,
- "file2": file2,
- "source": "salt://testfile",
- "req": "onchanges",
- }
- # Lay down the file used in the below SLS to ensure that when it is
- # run, there are no changes.
- self.run_state("file.managed", name=pillar["file2"], source=pillar["source"])
- ret = self.repack_state_returns(
- self.run_function(
- "state.apply", mods="onchanges_prereq", pillar=pillar, test=True,
- )
- )
- # The file states should both exit with None
- assert ret["one"]["result"] is None, ret["one"]["result"]
- assert ret["three"]["result"] is True, ret["three"]["result"]
- # The first file state should have changes, since a new file was
- # created. The other one should not, since we already created that file
- # before applying the SLS file.
- assert ret["one"]["changes"]
- assert not ret["three"]["changes"], ret["three"]["changes"]
- # The state watching 'one' should have been run due to changes
- assert ret["two"]["comment"] == "Success!", ret["two"]["comment"]
- # The state watching 'three' should not have been run
- assert (
- ret["four"]["comment"]
- == "State was not run because none of the onchanges reqs changed"
- ), ret["four"]["comment"]
- @with_tempfile(create=False)
- @with_tempfile(create=False)
- def test_file_managed_prereq(self, file1, file2):
- """
- Test file.managed state with prereq
- """
- pillar = {
- "file1": file1,
- "file2": file2,
- "source": "salt://testfile",
- "req": "prereq",
- }
- # Lay down the file used in the below SLS to ensure that when it is
- # run, there are no changes.
- self.run_state("file.managed", name=pillar["file2"], source=pillar["source"])
- ret = self.repack_state_returns(
- self.run_function(
- "state.apply", mods="onchanges_prereq", pillar=pillar, test=True,
- )
- )
- # The file states should both exit with None
- assert ret["one"]["result"] is None, ret["one"]["result"]
- assert ret["three"]["result"] is True, ret["three"]["result"]
- # The first file state should have changes, since a new file was
- # created. The other one should not, since we already created that file
- # before applying the SLS file.
- assert ret["one"]["changes"]
- assert not ret["three"]["changes"], ret["three"]["changes"]
- # The state watching 'one' should have been run due to changes
- assert ret["two"]["comment"] == "Success!", ret["two"]["comment"]
- # The state watching 'three' should not have been run
- assert ret["four"]["comment"] == "No changes detected", ret["four"]["comment"]
- def test_directory(self):
- """
- file.directory
- """
- name = self.tmp_dir / "a_new_dir"
- self.addCleanup(salt.utils.files.rm_rf, str(name))
- ret = self.run_state("file.directory", name=str(name))
- self.assertSaltTrueReturn(ret)
- self.assertTrue(name.is_dir())
- def test_directory_symlink_dry_run(self):
- """
- Ensure that symlinks are followed when file.directory is run with
- test=True
- """
- tmp_dir = self.tmp_dir / "pgdata"
- self.addCleanup(salt.utils.files.rm_rf, str(tmp_dir))
- sym_dir = self.tmp_dir / "pg_data"
- self.addCleanup(salt.utils.files.safe_rm, str(sym_dir))
- tmp_dir.mkdir(0o0700)
- sym_dir.symlink_to(tmp_dir, target_is_directory=IS_WINDOWS)
- if IS_WINDOWS:
- ret = self.run_state(
- "file.directory",
- test=True,
- name=str(sym_dir),
- follow_symlinks=True,
- win_owner="Administrators",
- )
- else:
- ret = self.run_state(
- "file.directory",
- test=True,
- name=str(sym_dir),
- follow_symlinks=True,
- mode=700,
- )
- self.assertSaltTrueReturn(ret)
- @requires_system_grains
- @skip_if_not_root
- @skipIf(IS_WINDOWS, "Mode not available in Windows")
- def test_directory_max_depth(self, grains):
- """
- file.directory
- Test the max_depth option by iteratively increasing the depth and
- checking that no changes deeper than max_depth have been attempted
- """
- def _get_oct_mode(name):
- """
- Return a string octal representation of the permissions for name
- """
- return salt.utils.files.normalize_mode(oct(os.stat(name).st_mode & 0o777))
- top = os.path.join(RUNTIME_VARS.TMP, "top_dir")
- self.addCleanup(salt.utils.files.rm_rf, top)
- sub = os.path.join(top, "sub_dir")
- subsub = os.path.join(sub, "sub_sub_dir")
- dirs = [top, sub, subsub]
- initial_mode = "0111"
- changed_mode = "0555"
- if grains["os_family"] in ("VMware Photon OS",):
- initial_modes = {
- 0: {sub: "0750", subsub: "0110"},
- 1: {sub: "0110", subsub: "0110"},
- 2: {sub: "0110", subsub: "0110"},
- }
- else:
- initial_modes = {
- 0: {sub: "0755", subsub: "0111"},
- 1: {sub: "0111", subsub: "0111"},
- 2: {sub: "0111", subsub: "0111"},
- }
- if not os.path.isdir(subsub):
- os.makedirs(subsub, int(initial_mode, 8))
- for depth in range(0, 3):
- ret = self.run_state(
- "file.directory",
- name=top,
- max_depth=depth,
- dir_mode=changed_mode,
- recurse=["mode"],
- )
- self.assertSaltTrueReturn(ret)
- for changed_dir in dirs[0 : depth + 1]:
- self.assertEqual(changed_mode, _get_oct_mode(changed_dir))
- for untouched_dir in dirs[depth + 1 :]:
- # Beginning in Python 3.7, os.makedirs no longer sets
- # the mode of intermediate directories to the mode that
- # is passed.
- if sys.version_info >= (3, 7):
- _mode = initial_modes[depth][untouched_dir]
- self.assertEqual(_mode, _get_oct_mode(untouched_dir))
- else:
- self.assertEqual(initial_mode, _get_oct_mode(untouched_dir))
- def test_test_directory(self):
- """
- file.directory
- """
- name = self.tmp_dir / "a_not_dir"
- self.addCleanup(shutil.rmtree, str(name), ignore_errors=True)
- ret = self.run_state("file.directory", test=True, name=str(name))
- self.assertSaltNoneReturn(ret)
- self.assertFalse(name.is_dir())
- @with_tempdir()
- def test_directory_clean(self, base_dir):
- """
- file.directory with clean=True
- """
- name = os.path.join(base_dir, "directory_clean_dir")
- os.mkdir(name)
- strayfile = os.path.join(name, "strayfile")
- with salt.utils.files.fopen(strayfile, "w"):
- pass
- straydir = os.path.join(name, "straydir")
- if not os.path.isdir(straydir):
- os.makedirs(straydir)
- with salt.utils.files.fopen(os.path.join(straydir, "strayfile2"), "w"):
- pass
- ret = self.run_state("file.directory", name=name, clean=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertFalse(os.path.exists(straydir))
- self.assertTrue(os.path.isdir(name))
- def test_directory_is_idempotent(self):
- """
- Ensure the file.directory state produces no changes when rerun.
- """
- name = self.tmp_dir / "a_dir_twice"
- self.addCleanup(salt.utils.files.rm_rf, str(name))
- if IS_WINDOWS:
- username = os.environ.get("USERNAME", "Administrators")
- domain = os.environ.get("USERDOMAIN", "")
- fullname = "{}\\{}".format(domain, username)
- ret = self.run_state("file.directory", name=str(name), win_owner=fullname)
- else:
- ret = self.run_state("file.directory", name=str(name))
- self.assertSaltTrueReturn(ret)
- if IS_WINDOWS:
- ret = self.run_state("file.directory", name=str(name), win_owner=username)
- else:
- ret = self.run_state("file.directory", name=str(name))
- self.assertSaltTrueReturn(ret)
- self.assertSaltStateChangesEqual(ret, {})
- @with_tempdir()
- def test_directory_clean_exclude(self, base_dir):
- """
- file.directory with clean=True and exclude_pat set
- """
- name = os.path.join(base_dir, "directory_clean_dir")
- if not os.path.isdir(name):
- os.makedirs(name)
- strayfile = os.path.join(name, "strayfile")
- with salt.utils.files.fopen(strayfile, "w"):
- pass
- straydir = os.path.join(name, "straydir")
- if not os.path.isdir(straydir):
- os.makedirs(straydir)
- strayfile2 = os.path.join(straydir, "strayfile2")
- with salt.utils.files.fopen(strayfile2, "w"):
- pass
- keepfile = os.path.join(straydir, "keepfile")
- with salt.utils.files.fopen(keepfile, "w"):
- pass
- exclude_pat = "E@^straydir(|/keepfile)$"
- if IS_WINDOWS:
- exclude_pat = "E@^straydir(|\\\\keepfile)$"
- ret = self.run_state(
- "file.directory", name=name, clean=True, exclude_pat=exclude_pat
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertFalse(os.path.exists(strayfile2))
- self.assertTrue(os.path.exists(keepfile))
- @skipIf(IS_WINDOWS, "Skip on windows")
- @with_tempdir()
- def test_test_directory_clean_exclude(self, base_dir):
- """
- file.directory with test=True, clean=True and exclude_pat set
- Skipped on windows because clean and exclude_pat not supported by
- salt.sates.file._check_directory_win
- """
- name = os.path.join(base_dir, "directory_clean_dir")
- os.mkdir(name)
- strayfile = os.path.join(name, "strayfile")
- with salt.utils.files.fopen(strayfile, "w"):
- pass
- straydir = os.path.join(name, "straydir")
- if not os.path.isdir(straydir):
- os.makedirs(straydir)
- strayfile2 = os.path.join(straydir, "strayfile2")
- with salt.utils.files.fopen(strayfile2, "w"):
- pass
- keepfile = os.path.join(straydir, "keepfile")
- with salt.utils.files.fopen(keepfile, "w"):
- pass
- exclude_pat = "E@^straydir(|/keepfile)$"
- if IS_WINDOWS:
- exclude_pat = "E@^straydir(|\\\\keepfile)$"
- ret = self.run_state(
- "file.directory", test=True, name=name, clean=True, exclude_pat=exclude_pat
- )
- comment = next(iter(ret.values()))["comment"]
- self.assertSaltNoneReturn(ret)
- self.assertTrue(os.path.exists(strayfile))
- self.assertTrue(os.path.exists(strayfile2))
- self.assertTrue(os.path.exists(keepfile))
- self.assertIn(strayfile, comment)
- self.assertIn(strayfile2, comment)
- self.assertNotIn(keepfile, comment)
- @with_tempdir()
- def test_directory_clean_require_in(self, name):
- """
- file.directory test with clean=True and require_in file
- """
- state_name = "file-FileTest-test_directory_clean_require_in"
- state_filename = state_name + ".sls"
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- wrong_file = os.path.join(name, "wrong")
- with salt.utils.files.fopen(wrong_file, "w") as fp:
- fp.write("foo")
- good_file = os.path.join(name, "bar")
- with salt.utils.files.fopen(state_file, "w") as fp:
- self.addCleanup(salt.utils.files.safe_rm, state_file)
- fp.write(
- textwrap.dedent(
- """\
- some_dir:
- file.directory:
- - name: {name}
- - clean: true
- {good_file}:
- file.managed:
- - require_in:
- - file: some_dir
- """.format(
- name=name, good_file=good_file
- )
- )
- )
- ret = self.run_function("state.sls", [state_name])
- self.assertTrue(os.path.exists(good_file))
- self.assertFalse(os.path.exists(wrong_file))
- @with_tempdir()
- def test_directory_clean_require_in_with_id(self, name):
- """
- file.directory test with clean=True and require_in file with an ID
- different from the file name
- """
- state_name = "file-FileTest-test_directory_clean_require_in_with_id"
- state_filename = state_name + ".sls"
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- wrong_file = os.path.join(name, "wrong")
- with salt.utils.files.fopen(wrong_file, "w") as fp:
- fp.write("foo")
- good_file = os.path.join(name, "bar")
- with salt.utils.files.fopen(state_file, "w") as fp:
- self.addCleanup(salt.utils.files.safe_rm, state_file)
- fp.write(
- textwrap.dedent(
- """\
- some_dir:
- file.directory:
- - name: {name}
- - clean: true
- some_file:
- file.managed:
- - name: {good_file}
- - require_in:
- - file: some_dir
- """.format(
- name=name, good_file=good_file
- )
- )
- )
- ret = self.run_function("state.sls", [state_name])
- self.assertTrue(os.path.exists(good_file))
- self.assertFalse(os.path.exists(wrong_file))
- @skipIf(
- salt.utils.platform.is_darwin(),
- "WAR ROOM TEMPORARY SKIP, Test is flaky on macosx",
- )
- @with_tempdir()
- def test_directory_clean_require_with_name(self, name):
- """
- file.directory test with clean=True and require with a file state
- relatively to the state's name, not its ID.
- """
- state_name = "file-FileTest-test_directory_clean_require_in_with_id"
- state_filename = state_name + ".sls"
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- wrong_file = os.path.join(name, "wrong")
- with salt.utils.files.fopen(wrong_file, "w") as fp:
- fp.write("foo")
- good_file = os.path.join(name, "bar")
- with salt.utils.files.fopen(state_file, "w") as fp:
- self.addCleanup(salt.utils.files.safe_rm, state_file)
- fp.write(
- textwrap.dedent(
- """\
- some_dir:
- file.directory:
- - name: {name}
- - clean: true
- - require:
- # This requirement refers to the name of the following
- # state, not its ID.
- - file: {good_file}
- some_file:
- file.managed:
- - name: {good_file}
- """.format(
- name=name, good_file=good_file
- )
- )
- )
- ret = self.run_function("state.sls", [state_name])
- self.assertTrue(os.path.exists(good_file))
- self.assertFalse(os.path.exists(wrong_file))
- def test_directory_broken_symlink(self):
- """
- Ensure that file.directory works even if a directory
- contains broken symbolic link
- """
- tmp_dir = self.tmp_dir / "foo"
- tmp_dir.mkdir(0o700)
- self.addCleanup(salt.utils.files.rm_rf, str(tmp_dir))
- null_file = tmp_dir / "null"
- broken_link = tmp_dir / "broken"
- broken_link.symlink_to(null_file)
- if IS_WINDOWS:
- ret = self.run_state(
- "file.directory",
- name=str(tmp_dir),
- recurse=["mode"],
- follow_symlinks=True,
- win_owner="Administrators",
- )
- else:
- ret = self.run_state(
- "file.directory",
- name=str(tmp_dir),
- recurse=["mode"],
- file_mode=644,
- dir_mode=755,
- )
- self.assertSaltTrueReturn(ret)
- @with_tempdir(create=False)
- def test_recurse(self, name):
- """
- file.recurse
- """
- ret = self.run_state("file.recurse", name=name, source="salt://grail")
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(name, "36", "scene")))
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_recurse_specific_env(self, dir1, dir2):
- """
- file.recurse passing __env__
- """
- ret = self.run_state(
- "file.recurse", name=dir1, source="salt://holy", __env__="prod"
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir1, "32", "scene")))
- ret = self.run_state(
- "file.recurse", name=dir2, source="salt://holy", saltenv="prod"
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir2, "32", "scene")))
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_recurse_specific_env_in_url(self, dir1, dir2):
- """
- file.recurse passing __env__
- """
- ret = self.run_state(
- "file.recurse", name=dir1, source="salt://holy?saltenv=prod"
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir1, "32", "scene")))
- ret = self.run_state(
- "file.recurse", name=dir2, source="salt://holy?saltenv=prod"
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir2, "32", "scene")))
- @with_tempdir(create=False)
- def test_test_recurse(self, name):
- """
- file.recurse test interface
- """
- ret = self.run_state(
- "file.recurse", test=True, name=name, source="salt://grail",
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(os.path.join(name, "36", "scene")))
- self.assertFalse(os.path.exists(name))
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_test_recurse_specific_env(self, dir1, dir2):
- """
- file.recurse test interface
- """
- ret = self.run_state(
- "file.recurse", test=True, name=dir1, source="salt://holy", __env__="prod"
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(os.path.join(dir1, "32", "scene")))
- self.assertFalse(os.path.exists(dir1))
- ret = self.run_state(
- "file.recurse", test=True, name=dir2, source="salt://holy", saltenv="prod"
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(os.path.join(dir2, "32", "scene")))
- self.assertFalse(os.path.exists(dir2))
- @with_tempdir(create=False)
- def test_recurse_template(self, name):
- """
- file.recurse with jinja template enabled
- """
- _ts = "TEMPLATE TEST STRING"
- ret = self.run_state(
- "file.recurse",
- name=name,
- source="salt://grail",
- template="jinja",
- defaults={"spam": _ts},
- )
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(os.path.join(name, "scene33"), "r") as fp_:
- contents = fp_.read()
- self.assertIn(_ts, contents)
- @with_tempdir()
- def test_recurse_clean(self, name):
- """
- file.recurse with clean=True
- """
- strayfile = os.path.join(name, "strayfile")
- with salt.utils.files.fopen(strayfile, "w"):
- pass
- # Corner cases: replacing file with a directory and vice versa
- with salt.utils.files.fopen(os.path.join(name, "36"), "w"):
- pass
- os.makedirs(os.path.join(name, "scene33"))
- ret = self.run_state(
- "file.recurse", name=name, source="salt://grail", clean=True
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertTrue(os.path.isfile(os.path.join(name, "36", "scene")))
- self.assertTrue(os.path.isfile(os.path.join(name, "scene33")))
- @with_tempdir()
- def test_recurse_clean_specific_env(self, name):
- """
- file.recurse with clean=True and __env__=prod
- """
- strayfile = os.path.join(name, "strayfile")
- with salt.utils.files.fopen(strayfile, "w"):
- pass
- # Corner cases: replacing file with a directory and vice versa
- with salt.utils.files.fopen(os.path.join(name, "32"), "w"):
- pass
- os.makedirs(os.path.join(name, "scene34"))
- ret = self.run_state(
- "file.recurse", name=name, source="salt://holy", clean=True, __env__="prod"
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertTrue(os.path.isfile(os.path.join(name, "32", "scene")))
- self.assertTrue(os.path.isfile(os.path.join(name, "scene34")))
- @skipIf(IS_WINDOWS, "Skip on windows")
- @with_tempdir()
- def test_recurse_issue_34945(self, base_dir):
- """
- This tests the case where the source dir for the file.recurse state
- does not contain any files (only subdirectories), and the dir_mode is
- being managed. For a long time, this corner case resulted in the top
- level of the destination directory being created with the wrong initial
- permissions, a problem that would be corrected later on in the
- file.recurse state via running state.directory. However, the
- file.directory state only gets called when there are files to be
- managed in that directory, and when the source directory contains only
- subdirectories, the incorrectly-set initial perms would not be
- repaired.
- This was fixed in https://github.com/saltstack/salt/pull/35309
- Skipped on windows because dir_mode is not supported.
- """
- dir_mode = "2775"
- issue_dir = "issue-34945"
- name = os.path.join(base_dir, issue_dir)
- ret = self.run_state(
- "file.recurse", name=name, source="salt://" + issue_dir, dir_mode=dir_mode
- )
- self.assertSaltTrueReturn(ret)
- actual_dir_mode = oct(stat.S_IMODE(os.stat(name).st_mode))[-4:]
- self.assertEqual(dir_mode, actual_dir_mode)
- @with_tempdir(create=False)
- def test_recurse_issue_40578(self, name):
- """
- This ensures that the state doesn't raise an exception when it
- encounters a file with a unicode filename in the process of invoking
- file.source_list.
- """
- ret = self.run_state("file.recurse", name=name, source="salt://соль")
- self.assertSaltTrueReturn(ret)
- if six.PY2 and IS_WINDOWS:
- # Providing unicode to os.listdir so that we avoid having listdir
- # try to decode the filenames using the systemencoding on windows
- # python 2.
- files = os.listdir(name.decode("utf-8"))
- else:
- files = salt.utils.data.decode(os.listdir(name), normalize=True)
- self.assertEqual(
- sorted(files), sorted(["foo.txt", "спам.txt", "яйца.txt"]),
- )
- @with_tempfile()
- def test_replace(self, name):
- """
- file.replace
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("change_me")
- ret = self.run_state(
- "file.replace", name=name, pattern="change", repl="salt", backup=False
- )
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertIn("salt", fp_.read())
- self.assertSaltTrueReturn(ret)
- @with_tempdir()
- def test_replace_issue_18612(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested file contains one commented line
- The commented line should be uncommented in the end, nothing else should change
- """
- test_name = "test_replace_issue_18612"
- path_test = os.path.join(base_dir, test_name)
- with salt.utils.files.fopen(path_test, "w+") as fp_test_:
- fp_test_.write("# en_US.UTF-8")
- ret = []
- for x in range(0, 3):
- ret.append(
- self.run_state(
- "file.replace",
- name=path_test,
- pattern="^# en_US.UTF-8$",
- repl="en_US.UTF-8",
- append_if_not_found=True,
- )
- )
- # ensure, the number of lines didn't change, even after invoking 'file.replace' 3 times
- with salt.utils.files.fopen(path_test, "r") as fp_test_:
- self.assertTrue(sum(1 for _ in fp_test_) == 1)
- # ensure, the replacement succeeded
- with salt.utils.files.fopen(path_test, "r") as fp_test_:
- self.assertTrue(fp_test_.read().startswith("en_US.UTF-8"))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_prepend(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested multifile contains multiple lines not matching the pattern or replacement in any way
- The replacement pattern should be prepended to the file
- """
- test_name = "test_replace_issue_18612_prepend"
- path_in = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{}.in".format(test_name)
- )
- path_out = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{}.out".format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(
- self.run_state(
- "file.replace",
- name=path_test,
- pattern="^# en_US.UTF-8$",
- repl="en_US.UTF-8",
- prepend_if_not_found=True,
- )
- )
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_append(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested multifile contains multiple lines not matching the pattern or replacement in any way
- The replacement pattern should be appended to the file
- """
- test_name = "test_replace_issue_18612_append"
- path_in = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{}.in".format(test_name)
- )
- path_out = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{}.out".format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(
- self.run_state(
- "file.replace",
- name=path_test,
- pattern="^# en_US.UTF-8$",
- repl="en_US.UTF-8",
- append_if_not_found=True,
- )
- )
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_append_not_found_content(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested multifile contains multiple lines not matching the pattern or replacement in any way
- The 'not_found_content' value should be appended to the file
- """
- test_name = "test_replace_issue_18612_append_not_found_content"
- path_in = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{}.in".format(test_name)
- )
- path_out = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{}.out".format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(
- self.run_state(
- "file.replace",
- name=path_test,
- pattern="^# en_US.UTF-8$",
- repl="en_US.UTF-8",
- append_if_not_found=True,
- not_found_content="THIS LINE WASN'T FOUND! SO WE'RE APPENDING IT HERE!",
- )
- )
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_change_mid_line_with_comment(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested file contains 5 key=value pairs
- The commented key=value pair #foo=bar should be changed to foo=salt
- The comment char (#) in front of foo=bar should be removed
- """
- test_name = "test_replace_issue_18612_change_mid_line_with_comment"
- path_in = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{}.in".format(test_name)
- )
- path_out = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{}.out".format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(
- self.run_state(
- "file.replace",
- name=path_test,
- pattern="^#foo=bar($|(?=\r\n))",
- repl="foo=salt",
- append_if_not_found=True,
- )
- )
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
- # ensure, all 'file.replace' runs reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18841_no_changes(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18841:
- Using file.replace in a way which shouldn't modify the file at all
- results in changed mtime of the original file and a backup file being created.
- # Case description
- The tested file contains multiple lines
- The tested file contains a line already matching the replacement (no change needed)
- The tested file's content shouldn't change at all
- The tested file's mtime shouldn't change at all
- No backup file should be created
- """
- test_name = "test_replace_issue_18841_no_changes"
- path_in = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{}.in".format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- # get (m|a)time of file
- fstats_orig = os.stat(path_test)
- # define how far we predate the file
- age = 5 * 24 * 60 * 60
- # set (m|a)time of file 5 days into the past
- os.utime(path_test, (fstats_orig.st_mtime - age, fstats_orig.st_atime - age))
- ret = self.run_state(
- "file.replace",
- name=path_test,
- pattern="^hello world$",
- repl="goodbye world",
- show_changes=True,
- flags=["IGNORECASE"],
- backup=False,
- )
- # get (m|a)time of file
- fstats_post = os.stat(path_test)
- # ensure, the file content didn't change
- self.assertTrue(filecmp.cmp(path_in, path_test))
- # ensure no backup file was created
- self.assertFalse(os.path.exists(path_test + ".bak"))
- # ensure the file's mtime didn't change
- self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime - age)
- # ensure, all 'file.replace' runs reported success
- self.assertSaltTrueReturn(ret)
- def test_serialize(self):
- """
- Test to ensure that file.serialize returns a data structure that's
- both serialized and formatted properly
- """
- path_test = self.tmp_dir / "test_serialize"
- self.addCleanup(salt.utils.files.safe_rm, str(path_test))
- ret = self.run_state(
- "file.serialize",
- name=str(path_test),
- dataset={
- "name": "naive",
- "description": "A basic test",
- "a_list": ["first_element", "second_element"],
- "finally": "the last item",
- },
- formatter="json",
- )
- serialized_file = salt.utils.stringutils.to_unicode(path_test.read_bytes())
- # The JSON serializer uses LF even on OSes where os.sep is CRLF.
- expected_file = "\n".join(
- [
- "{",
- ' "a_list": [',
- ' "first_element",',
- ' "second_element"',
- " ],",
- ' "description": "A basic test",',
- ' "finally": "the last item",',
- ' "name": "naive"',
- "}",
- "",
- ]
- )
- self.assertEqual(serialized_file, expected_file)
- @with_tempfile(create=False)
- def test_serializer_deserializer_opts(self, name):
- """
- Test the serializer_opts and deserializer_opts options
- """
- data1 = {"foo": {"bar": "%(x)s"}}
- data2 = {"foo": {"abc": 123}}
- merged = {"foo": {"y": "not_used", "x": "baz", "abc": 123, "bar": "baz"}}
- ret = self.run_state(
- "file.serialize",
- name=name,
- dataset=data1,
- formatter="configparser",
- deserializer_opts=[{"defaults": {"y": "not_used"}}],
- )
- ret = ret[next(iter(ret))]
- assert ret["result"], ret
- # We should have warned about deserializer_opts being used when
- # merge_if_exists was not set to True.
- assert "warnings" in ret
- # Run with merge_if_exists, as well as serializer and deserializer opts
- # deserializer opts will be used for string interpolation of the %(x)s
- # that was written to the file with data1 (i.e. bar should become baz)
- ret = self.run_state(
- "file.serialize",
- name=name,
- dataset=data2,
- formatter="configparser",
- merge_if_exists=True,
- serializer_opts=[{"defaults": {"y": "not_used"}}],
- deserializer_opts=[{"defaults": {"x": "baz"}}],
- )
- ret = ret[next(iter(ret))]
- assert ret["result"], ret
- with salt.utils.files.fopen(name) as fp_:
- serialized_data = salt.serializers.configparser.deserialize(fp_)
- # If this test fails, this debug logging will help tell us how the
- # serialized data differs from what was serialized.
- log.debug("serialized_data = %r", serialized_data)
- log.debug("merged = %r", merged)
- # serializing with a default of 'y' will add y = not_used into foo
- assert serialized_data["foo"]["y"] == merged["foo"]["y"]
- # deserializing with default of x = baz will perform interpolation on %(x)s
- # and bar will then = baz
- assert serialized_data["foo"]["bar"] == merged["foo"]["bar"]
- @with_tempfile(create=False)
- def test_serializer_plist_binary_file_open(self, name):
- """
- Test the serialization and deserialization of plists which should include
- the "rb" file open arguments change specifically for this formatter to handle
- binary plists.
- """
- data1 = {"foo": {"bar": "%(x)s"}}
- data2 = {"foo": {"abc": 123}}
- merged = {"foo": {"abc": 123, "bar": "%(x)s"}}
- ret = self.run_state(
- "file.serialize",
- name=name,
- dataset=data1,
- formatter="plist",
- serializer_opts=[{"fmt": "FMT_BINARY"}],
- )
- ret = ret[next(iter(ret))]
- assert ret["result"], ret
- # Run with merge_if_exists so we test the deserializer.
- ret = self.run_state(
- "file.serialize",
- name=name,
- dataset=data2,
- formatter="plist",
- merge_if_exists=True,
- serializer_opts=[{"fmt": "FMT_BINARY"}],
- )
- ret = ret[next(iter(ret))]
- assert ret["result"], ret
- with salt.utils.files.fopen(name, "rb") as fp_:
- serialized_data = salt.serializers.plist.deserialize(fp_)
- # make sure our serialized data matches what we expect
- assert serialized_data["foo"] == merged["foo"]
- @with_tempfile(create=False)
- def test_serializer_plist_file_open(self, name):
- """
- Test the serialization and deserialization of non binary plists with
- the new line concatenation.
- """
- data1 = {"foo": {"bar": "%(x)s"}}
- data2 = {"foo": {"abc": 123}}
- merged = {"foo": {"abc": 123, "bar": "%(x)s"}}
- ret = self.run_state(
- "file.serialize", name=name, dataset=data1, formatter="plist",
- )
- ret = ret[next(iter(ret))]
- assert ret["result"], ret
- # Run with merge_if_exists so we test the deserializer.
- ret = self.run_state(
- "file.serialize",
- name=name,
- dataset=data2,
- formatter="plist",
- merge_if_exists=True,
- )
- ret = ret[next(iter(ret))]
- assert ret["result"], ret
- with salt.utils.files.fopen(name, "rb") as fp_:
- serialized_data = salt.serializers.plist.deserialize(fp_)
- # make sure our serialized data matches what we expect
- assert serialized_data["foo"] == merged["foo"]
- @with_tempdir()
- def test_replace_issue_18841_omit_backup(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18841:
- Using file.replace in a way which shouldn't modify the file at all
- results in changed mtime of the original file and a backup file being created.
- # Case description
- The tested file contains multiple lines
- The tested file contains a line already matching the replacement (no change needed)
- The tested file's content shouldn't change at all
- The tested file's mtime shouldn't change at all
- No backup file should be created, although backup=False isn't explicitly defined
- """
- test_name = "test_replace_issue_18841_omit_backup"
- path_in = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{}.in".format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- # get (m|a)time of file
- fstats_orig = os.stat(path_test)
- # define how far we predate the file
- age = 5 * 24 * 60 * 60
- # set (m|a)time of file 5 days into the past
- os.utime(path_test, (fstats_orig.st_mtime - age, fstats_orig.st_atime - age))
- ret = self.run_state(
- "file.replace",
- name=path_test,
- pattern="^hello world$",
- repl="goodbye world",
- show_changes=True,
- flags=["IGNORECASE"],
- )
- # get (m|a)time of file
- fstats_post = os.stat(path_test)
- # ensure, the file content didn't change
- self.assertTrue(filecmp.cmp(path_in, path_test))
- # ensure no backup file was created
- self.assertFalse(os.path.exists(path_test + ".bak"))
- # ensure the file's mtime didn't change
- self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime - age)
- # ensure, all 'file.replace' runs reported success
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_comment(self, name):
- """
- file.comment
- """
- # write a line to file
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("comment_me")
- # Look for changes with test=True: return should be "None" at the first run
- ret = self.run_state("file.comment", test=True, name=name, regex="^comment")
- self.assertSaltNoneReturn(ret)
- # comment once
- ret = self.run_state("file.comment", name=name, regex="^comment")
- # result is positive
- self.assertSaltTrueReturn(ret)
- # line is commented
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertTrue(fp_.read().startswith("#comment"))
- # comment twice
- ret = self.run_state("file.comment", name=name, regex="^comment")
- # result is still positive
- self.assertSaltTrueReturn(ret)
- # line is still commented
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertTrue(fp_.read().startswith("#comment"))
- # Test previously commented file returns "True" now and not "None" with test=True
- ret = self.run_state("file.comment", test=True, name=name, regex="^comment")
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_test_comment(self, name):
- """
- file.comment test interface
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("comment_me")
- ret = self.run_state("file.comment", test=True, name=name, regex=".*comment.*",)
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertNotIn("#comment", fp_.read())
- self.assertSaltNoneReturn(ret)
- @with_tempfile()
- def test_uncomment(self, name):
- """
- file.uncomment
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("#comment_me")
- ret = self.run_state("file.uncomment", name=name, regex="^comment")
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertNotIn("#comment", fp_.read())
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_test_uncomment(self, name):
- """
- file.comment test interface
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("#comment_me")
- ret = self.run_state("file.uncomment", test=True, name=name, regex="^comment.*")
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertIn("#comment", fp_.read())
- self.assertSaltNoneReturn(ret)
- @with_tempfile()
- def test_append(self, name):
- """
- file.append
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("#salty!")
- ret = self.run_state("file.append", name=name, text="cheese")
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertIn("cheese", fp_.read())
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_test_append(self, name):
- """
- file.append test interface
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("#salty!")
- ret = self.run_state("file.append", test=True, name=name, text="cheese")
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertNotIn("cheese", fp_.read())
- self.assertSaltNoneReturn(ret)
- @with_tempdir()
- def test_append_issue_1864_makedirs(self, base_dir):
- """
- file.append but create directories if needed as an option, and create
- the file if it doesn't exist
- """
- fname = "append_issue_1864_makedirs"
- name = os.path.join(base_dir, fname)
- # Non existing file get's touched
- ret = self.run_state("file.append", name=name, text="cheese", makedirs=True)
- self.assertSaltTrueReturn(ret)
- # Nested directory and file get's touched
- name = os.path.join(base_dir, "issue_1864", fname)
- ret = self.run_state("file.append", name=name, text="cheese", makedirs=True)
- self.assertSaltTrueReturn(ret)
- # Parent directory exists but file does not and makedirs is False
- name = os.path.join(base_dir, "issue_1864", fname + "2")
- ret = self.run_state("file.append", name=name, text="cheese")
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(name))
- @with_tempdir()
- def test_prepend_issue_27401_makedirs(self, base_dir):
- """
- file.prepend but create directories if needed as an option, and create
- the file if it doesn't exist
- """
- fname = "prepend_issue_27401"
- name = os.path.join(base_dir, fname)
- # Non existing file get's touched
- ret = self.run_state("file.prepend", name=name, text="cheese", makedirs=True)
- self.assertSaltTrueReturn(ret)
- # Nested directory and file get's touched
- name = os.path.join(base_dir, "issue_27401", fname)
- ret = self.run_state("file.prepend", name=name, text="cheese", makedirs=True)
- self.assertSaltTrueReturn(ret)
- # Parent directory exists but file does not and makedirs is False
- name = os.path.join(base_dir, "issue_27401", fname + "2")
- ret = self.run_state("file.prepend", name=name, text="cheese")
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(name))
- @with_tempfile()
- def test_touch(self, name):
- """
- file.touch
- """
- ret = self.run_state("file.touch", name=name)
- self.assertTrue(os.path.isfile(name))
- self.assertSaltTrueReturn(ret)
- @with_tempfile(create=False)
- def test_test_touch(self, name):
- """
- file.touch test interface
- """
- ret = self.run_state("file.touch", test=True, name=name)
- self.assertFalse(os.path.isfile(name))
- self.assertSaltNoneReturn(ret)
- @with_tempdir()
- def test_touch_directory(self, base_dir):
- """
- file.touch a directory
- """
- name = os.path.join(base_dir, "touch_test_dir")
- os.mkdir(name)
- ret = self.run_state("file.touch", name=name)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isdir(name))
- @with_tempdir()
- def test_issue_2227_file_append(self, base_dir):
- """
- Text to append includes a percent symbol
- """
- # let's make use of existing state to create a file with contents to
- # test against
- tmp_file_append = os.path.join(base_dir, "test.append")
- self.run_state("file.touch", name=tmp_file_append)
- self.run_state(
- "file.append", name=tmp_file_append, source="salt://testappend/firstif"
- )
- self.run_state(
- "file.append", name=tmp_file_append, source="salt://testappend/secondif"
- )
- # Now our real test
- try:
- ret = self.run_state(
- "file.append", name=tmp_file_append, text="HISTTIMEFORMAT='%F %T '"
- )
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(tmp_file_append, "r") as fp_:
- contents_pre = fp_.read()
- # It should not append text again
- ret = self.run_state(
- "file.append", name=tmp_file_append, text="HISTTIMEFORMAT='%F %T '"
- )
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(tmp_file_append, "r") as fp_:
- contents_post = fp_.read()
- self.assertEqual(contents_pre, contents_post)
- except AssertionError:
- if os.path.exists(tmp_file_append):
- shutil.copy(tmp_file_append, tmp_file_append + ".bak")
- raise
- @with_tempdir()
- def test_issue_2401_file_comment(self, base_dir):
- # Get a path to the temporary file
- tmp_file = os.path.join(base_dir, "issue-2041-comment.txt")
- # Write some data to it
- with salt.utils.files.fopen(tmp_file, "w") as fp_:
- fp_.write("hello\nworld\n")
- # create the sls template
- template_lines = [
- "{}:".format(tmp_file),
- " file.comment:",
- " - regex: ^world",
- ]
- template = "\n".join(template_lines)
- try:
- ret = self.run_function("state.template_str", [template], timeout=120)
- self.assertSaltTrueReturn(ret)
- self.assertNotInSaltComment("Pattern already commented", ret)
- self.assertInSaltComment("Commented lines successfully", ret)
- # This next time, it is already commented.
- ret = self.run_function("state.template_str", [template], timeout=120)
- self.assertSaltTrueReturn(ret)
- self.assertInSaltComment("Pattern already commented", ret)
- except AssertionError:
- shutil.copy(tmp_file, tmp_file + ".bak")
- raise
- @with_tempdir()
- def test_issue_2379_file_append(self, base_dir):
- # Get a path to the temporary file
- tmp_file = os.path.join(base_dir, "issue-2379-file-append.txt")
- # Write some data to it
- with salt.utils.files.fopen(tmp_file, "w") as fp_:
- fp_.write(
- "hello\nworld\n" # Some junk
- "#PermitRootLogin yes\n" # Commented text
- "# PermitRootLogin yes\n" # Commented text with space
- )
- # create the sls template
- template_lines = [
- "{}:".format(tmp_file),
- " file.append:",
- " - text: PermitRootLogin yes",
- ]
- template = "\n".join(template_lines)
- try:
- ret = self.run_function("state.template_str", [template])
- self.assertSaltTrueReturn(ret)
- self.assertInSaltComment("Appended 1 lines", ret)
- except AssertionError:
- shutil.copy(tmp_file, tmp_file + ".bak")
- raise
- @skipIf(IS_WINDOWS, "Mode not available in Windows")
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_issue_2726_mode_kwarg(self, dir1, dir2):
- # Let's test for the wrong usage approach
- bad_mode_kwarg_testfile = os.path.join(dir1, "bad_mode_kwarg", "testfile")
- bad_template = [
- "{}:".format(bad_mode_kwarg_testfile),
- " file.recurse:",
- " - source: salt://testfile",
- " - mode: 644",
- ]
- ret = self.run_function("state.template_str", [os.linesep.join(bad_template)])
- self.assertSaltFalseReturn(ret)
- self.assertInSaltComment(
- "'mode' is not allowed in 'file.recurse'. Please use "
- "'file_mode' and 'dir_mode'.",
- ret,
- )
- self.assertNotInSaltComment(
- "TypeError: managed() got multiple values for keyword " "argument 'mode'",
- ret,
- )
- # Now, the correct usage approach
- good_mode_kwargs_testfile = os.path.join(dir2, "good_mode_kwargs", "testappend")
- good_template = [
- "{}:".format(good_mode_kwargs_testfile),
- " file.recurse:",
- " - source: salt://testappend",
- " - dir_mode: 744",
- " - file_mode: 644",
- ]
- ret = self.run_function("state.template_str", [os.linesep.join(good_template)])
- self.assertSaltTrueReturn(ret)
- @with_tempdir()
- def test_issue_8343_accumulated_require_in(self, base_dir):
- template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-8343.sls")
- testcase_filedest = os.path.join(base_dir, "issue-8343.txt")
- if os.path.exists(template_path):
- os.remove(template_path)
- if os.path.exists(testcase_filedest):
- os.remove(testcase_filedest)
- sls_template = [
- "{0}:",
- " file.managed:",
- " - contents: |",
- " #",
- "",
- "prepend-foo-accumulator-from-pillar:",
- " file.accumulated:",
- " - require_in:",
- " - file: prepend-foo-management",
- " - filename: {0}",
- " - text: |",
- " foo",
- "",
- "append-foo-accumulator-from-pillar:",
- " file.accumulated:",
- " - require_in:",
- " - file: append-foo-management",
- " - filename: {0}",
- " - text: |",
- " bar",
- "",
- "prepend-foo-management:",
- " file.blockreplace:",
- " - name: {0}",
- ' - marker_start: "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT"',
- ' - marker_end: "#-- end salt managed zonestart --"',
- " - content: ''",
- " - prepend_if_not_found: True",
- " - backup: '.bak'",
- " - show_changes: True",
- "",
- "append-foo-management:",
- " file.blockreplace:",
- " - name: {0}",
- ' - marker_start: "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT"',
- ' - marker_end: "#-- end salt managed zoneend --"',
- " - content: ''",
- " - append_if_not_found: True",
- " - backup: '.bak2'",
- " - show_changes: True",
- "",
- ]
- with salt.utils.files.fopen(template_path, "w") as fp_:
- fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
- ret = self.run_function("state.sls", mods="issue-8343")
- for name, step in ret.items():
- self.assertSaltTrueReturn({name: step})
- with salt.utils.files.fopen(testcase_filedest) as fp_:
- contents = fp_.read().split(os.linesep)
- expected = [
- "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT",
- "foo",
- "#-- end salt managed zonestart --",
- "#",
- "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT",
- "bar",
- "#-- end salt managed zoneend --",
- "",
- ]
- self.assertEqual(
- [salt.utils.stringutils.to_str(line) for line in expected], contents
- )
- @with_tempdir()
- @skipIf(
- salt.utils.platform.is_darwin() and six.PY2, "This test hangs on OS X on Py2"
- )
- def test_issue_11003_immutable_lazy_proxy_sum(self, base_dir):
- # causes the Import-Module ServerManager error on Windows
- template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-11003.sls")
- testcase_filedest = os.path.join(base_dir, "issue-11003.txt")
- sls_template = [
- "a{0}:",
- " file.absent:",
- " - name: {0}",
- "",
- "{0}:",
- " file.managed:",
- " - contents: |",
- " #",
- "",
- "test-acc1:",
- " file.accumulated:",
- " - require_in:",
- " - file: final",
- " - filename: {0}",
- " - text: |",
- " bar",
- "",
- "test-acc2:",
- " file.accumulated:",
- " - watch_in:",
- " - file: final",
- " - filename: {0}",
- " - text: |",
- " baz",
- "",
- "final:",
- " file.blockreplace:",
- " - name: {0}",
- ' - marker_start: "#-- start managed zone PLEASE, DO NOT EDIT"',
- ' - marker_end: "#-- end managed zone"',
- " - content: ''",
- " - append_if_not_found: True",
- " - show_changes: True",
- ]
- with salt.utils.files.fopen(template_path, "w") as fp_:
- fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
- ret = self.run_function("state.sls", mods="issue-11003", timeout=600)
- for name, step in ret.items():
- self.assertSaltTrueReturn({name: step})
- with salt.utils.files.fopen(testcase_filedest) as fp_:
- contents = fp_.read().split(os.linesep)
- begin = contents.index("#-- start managed zone PLEASE, DO NOT EDIT") + 1
- end = contents.index("#-- end managed zone")
- block_contents = contents[begin:end]
- for item in ("", "bar", "baz"):
- block_contents.remove(item)
- self.assertEqual(block_contents, [])
- @with_tempdir()
- def test_issue_8947_utf8_sls(self, base_dir):
- """
- Test some file operation with utf-8 characters on the sls
- This is more generic than just a file test. Feel free to move
- """
- self.maxDiff = None
- korean_1 = "한국어 시험"
- korean_2 = "첫 번째 행"
- korean_3 = "마지막 행"
- test_file = os.path.join(base_dir, "{}.txt".format(korean_1))
- test_file_encoded = test_file
- template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-8947.sls")
- # create the sls template
- template = textwrap.dedent(
- """\
- some-utf8-file-create:
- file.managed:
- - name: {test_file}
- - contents: {korean_1}
- - makedirs: True
- - replace: True
- - show_diff: True
- some-utf8-file-create2:
- file.managed:
- - name: {test_file}
- - contents: |
- {korean_2}
- {korean_1}
- {korean_3}
- - replace: True
- - show_diff: True
- """.format(
- **locals()
- )
- )
- if not salt.utils.platform.is_windows():
- template += textwrap.dedent(
- """\
- some-utf8-file-content-test:
- cmd.run:
- - name: 'cat "{test_file}"'
- - require:
- - file: some-utf8-file-create2
- """.format(
- **locals()
- )
- )
- # Save template file
- with salt.utils.files.fopen(template_path, "wb") as fp_:
- fp_.write(salt.utils.stringutils.to_bytes(template))
- try:
- result = self.run_function("state.sls", mods="issue-8947")
- if not isinstance(result, dict):
- raise AssertionError(
- (
- "Something went really wrong while testing this sls:" " {}"
- ).format(repr(result))
- )
- # difflib produces different output on python 2.6 than on >=2.7
- if sys.version_info < (2, 7):
- diff = "--- \n+++ \n@@ -1,1 +1,3 @@\n"
- else:
- diff = "--- \n+++ \n@@ -1 +1,3 @@\n"
- diff += ("+첫 번째 행{0}" " 한국어 시험{0}" "+마지막 행{0}").format(os.linesep)
- ret = {x.split("_|-")[1]: y for x, y in result.items()}
- # Confirm initial creation of file
- self.assertEqual(
- ret["some-utf8-file-create"]["comment"],
- "File {} updated".format(test_file_encoded),
- )
- self.assertEqual(
- ret["some-utf8-file-create"]["changes"], {"diff": "New file"}
- )
- # Confirm file was modified and that the diff was as expected
- self.assertEqual(
- ret["some-utf8-file-create2"]["comment"],
- "File {} updated".format(test_file_encoded),
- )
- self.assertEqual(ret["some-utf8-file-create2"]["changes"], {"diff": diff})
- if salt.utils.platform.is_windows():
- import subprocess
- import win32api
- p = subprocess.Popen(
- salt.utils.stringutils.to_str(
- "type {}".format(win32api.GetShortPathName(test_file))
- ),
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
- p.poll()
- out = p.stdout.read()
- self.assertEqual(
- out.decode("utf-8"),
- os.linesep.join((korean_2, korean_1, korean_3)) + os.linesep,
- )
- else:
- self.assertEqual(
- ret["some-utf8-file-content-test"]["comment"],
- 'Command "cat "{}"" run'.format(test_file_encoded),
- )
- self.assertEqual(
- ret["some-utf8-file-content-test"]["changes"]["stdout"],
- "\n".join((korean_2, korean_1, korean_3)),
- )
- finally:
- try:
- os.remove(template_path)
- except OSError:
- pass
- @skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group(
- TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
- )
- @with_tempdir()
- @skipIf(salt.utils.platform.is_freebsd(), "Test is failing on FreeBSD")
- def test_issue_12209_follow_symlinks(self, tempdir, user, group):
- """
- Ensure that symlinks are properly chowned when recursing (following
- symlinks)
- """
- # Make the directories for this test
- onedir = os.path.join(tempdir, "one")
- twodir = os.path.join(tempdir, "two")
- os.mkdir(onedir)
- os.symlink(onedir, twodir)
- # Run the state
- ret = self.run_state(
- "file.directory",
- name=tempdir,
- follow_symlinks=True,
- user=user,
- group=group,
- recurse=["user", "group"],
- )
- self.assertSaltTrueReturn(ret)
- # Double-check, in case state mis-reported a True result. Since we are
- # following symlinks, we expect twodir to still be owned by root, but
- # onedir should be owned by the 'issue12209' user.
- onestats = os.stat(onedir)
- twostats = os.lstat(twodir)
- self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
- self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, "root")
- self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
- if salt.utils.path.which("id"):
- root_group = self.run_function("user.primary_group", ["root"])
- self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, root_group)
- @skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group(
- TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
- )
- @with_tempdir()
- def test_issue_12209_no_follow_symlinks(self, tempdir, user, group):
- """
- Ensure that symlinks are properly chowned when recursing (not following
- symlinks)
- """
- # Make the directories for this test
- onedir = os.path.join(tempdir, "one")
- twodir = os.path.join(tempdir, "two")
- os.mkdir(onedir)
- os.symlink(onedir, twodir)
- # Run the state
- ret = self.run_state(
- "file.directory",
- name=tempdir,
- follow_symlinks=False,
- user=user,
- group=group,
- recurse=["user", "group"],
- )
- self.assertSaltTrueReturn(ret)
- # Double-check, in case state mis-reported a True result. Since we
- # are not following symlinks, we expect twodir to now be owned by
- # the 'issue12209' user, just link onedir.
- onestats = os.stat(onedir)
- twostats = os.lstat(twodir)
- self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
- self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, user)
- self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
- self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, group)
- @with_tempfile(create=False)
- @with_tempfile()
- def test_template_local_file(self, source, dest):
- """
- Test a file.managed state with a local file as the source. Test both
- with the file:// protocol designation prepended, and without it.
- """
- with salt.utils.files.fopen(source, "w") as fp_:
- fp_.write("{{ foo }}\n")
- for prefix in ("file://", ""):
- ret = self.run_state(
- "file.managed",
- name=dest,
- source=prefix + source,
- template="jinja",
- context={"foo": "Hello world!"},
- )
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_template_local_file_noclobber(self, source):
- """
- Test the case where a source file is in the minion's local filesystem,
- and the source path is the same as the destination path.
- """
- with salt.utils.files.fopen(source, "w") as fp_:
- fp_.write("{{ foo }}\n")
- ret = self.run_state(
- "file.managed",
- name=source,
- source=source,
- template="jinja",
- context={"foo": "Hello world!"},
- )
- self.assertSaltFalseReturn(ret)
- self.assertIn(
- ("Source file cannot be the same as destination"),
- ret[next(iter(ret))]["comment"],
- )
- @with_tempfile(create=False)
- @with_tempfile(create=False)
- def test_issue_25250_force_copy_deletes(self, source, dest):
- """
- ensure force option in copy state does not delete target file
- """
- shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "hosts"), source)
- shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "file/base/cheese"), dest)
- self.run_state("file.copy", name=dest, source=source, force=True)
- self.assertTrue(os.path.exists(dest))
- self.assertTrue(filecmp.cmp(source, dest))
- os.remove(source)
- os.remove(dest)
- @destructiveTest
- @skip_if_not_root
- @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
- @with_tempfile()
- def test_file_copy_make_dirs(self, source):
- """
- ensure make_dirs creates correct user perms
- """
- shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "hosts"), source)
- dest = self.tmp_dir / "dir1" / "dir2" / "copied_file.txt"
- self.addCleanup(salt.utils.files.rm_rf, str(dest.parent.parent))
- user = "salt"
- mode = "0644"
- ret = self.run_function("user.add", [user])
- self.assertTrue(ret, "Failed to add user. Are you running as sudo?")
- ret = self.run_state(
- "file.copy",
- name=str(dest),
- source=source,
- user=user,
- makedirs=True,
- mode=mode,
- )
- self.assertSaltTrueReturn(ret)
- file_checks = [str(dest), str(dest.parent), str(dest.parent.parent)]
- for check in file_checks:
- user_check = self.run_function("file.get_user", [check])
- mode_check = self.run_function("file.get_mode", [check])
- self.assertEqual(user_check, user)
- self.assertEqual(salt.utils.files.normalize_mode(mode_check), mode)
- @skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group(
- TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
- )
- def test_owner_after_setuid(self, user, group):
- """
- Test to check file user/group after setting setuid or setgid.
- Because Python os.chown() does reset the setuid/setgid to 0.
- https://github.com/saltstack/salt/pull/45257
- """
- # Desired configuration.
- desired_file = self.tmp_dir / "file_with_setuid"
- self.addCleanup(salt.utils.files.safe_rm, str(desired_file))
- desired = {
- "file": str(desired_file),
- "user": user,
- "group": group,
- "mode": "4750",
- }
- # Run the state.
- ret = self.run_state(
- "file.managed",
- name=desired["file"],
- user=desired["user"],
- group=desired["group"],
- mode=desired["mode"],
- )
- # Check result.
- file_stat = desired_file.stat()
- result = {
- "user": pwd.getpwuid(file_stat.st_uid).pw_name,
- "group": grp.getgrgid(file_stat.st_gid).gr_name,
- "mode": oct(stat.S_IMODE(file_stat.st_mode)),
- }
- self.assertSaltTrueReturn(ret)
- self.assertEqual(desired["user"], result["user"])
- self.assertEqual(desired["group"], result["group"])
- self.assertEqual(desired["mode"], result["mode"].lstrip("0Oo"))
- def test_binary_contents(self):
- """
- This tests to ensure that binary contents do not cause a traceback.
- """
- name = self.tmp_dir / "1px.gif"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- ret = self.run_state("file.managed", name=str(name), contents=BINARY_FILE)
- self.assertSaltTrueReturn(ret)
- def test_binary_contents_twice(self):
- """
- This test ensures that after a binary file is created, salt can confirm
- that the file is in the correct state.
- """
- # Create a binary file
- name = self.tmp_dir / "1px.gif"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- # First run state ensures file is created
- ret = self.run_state("file.managed", name=str(name), contents=BINARY_FILE)
- self.assertSaltTrueReturn(ret)
- # Second run of state ensures file is in correct state
- ret = self.run_state("file.managed", name=str(name), contents=BINARY_FILE)
- self.assertSaltTrueReturn(ret)
- @skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group(
- TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
- )
- @with_tempdir()
- def test_issue_48336_file_managed_mode_setuid(self, tempdir, user, group):
- """
- Ensure that mode is correct with changing of ownership and group
- symlinks)
- """
- tempfile = os.path.join(tempdir, "temp_file_issue_48336")
- # Run the state
- ret = self.run_state(
- "file.managed", name=tempfile, user=user, group=group, mode="4750",
- )
- self.assertSaltTrueReturn(ret)
- # Check that the owner and group are correct, and
- # the mode is what we expect
- temp_file_stats = os.stat(tempfile)
- # Normalize the mode
- temp_file_mode = str(oct(stat.S_IMODE(temp_file_stats.st_mode)))
- temp_file_mode = salt.utils.files.normalize_mode(temp_file_mode)
- self.assertEqual(temp_file_mode, "4750")
- self.assertEqual(pwd.getpwuid(temp_file_stats.st_uid).pw_name, user)
- self.assertEqual(grp.getgrgid(temp_file_stats.st_gid).gr_name, group)
- @with_tempdir()
- def test_issue_48557(self, tempdir):
- tempfile = os.path.join(tempdir, "temp_file_issue_48557")
- with salt.utils.files.fopen(tempfile, "wb") as fp:
- fp.write(os.linesep.join(["test1", "test2", "test3", ""]).encode("utf-8"))
- ret = self.run_state(
- "file.line", name=tempfile, after="test2", mode="insert", content="test4"
- )
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(tempfile, "rb") as fp:
- content = fp.read()
- self.assertEqual(
- content,
- os.linesep.join(["test1", "test2", "test4", "test3", ""]).encode("utf-8"),
- )
- def test_managed_file_issue_51208(self):
- """
- Test to ensure we can handle a file with escaped double-quotes
- """
- name = self.tmp_dir / "issue_51208.txt"
- self.addCleanup(salt.utils.files.safe_rm, str(name))
- ret = self.run_state(
- "file.managed", name=str(name), source="salt://issue-51208/vimrc.stub"
- )
- src = pathlib.Path(RUNTIME_VARS.BASE_FILES) / "issue-51208" / "vimrc.stub"
- master_data = src.read_text()
- minion_data = name.read_text()
- self.assertEqual(master_data, minion_data)
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_keyvalue(self, name):
- """
- file.keyvalue
- """
- content = dedent(
- """\
- # This is the sshd server system-wide configuration file. See
- # sshd_config(5) for more information.
- # The strategy used for options in the default sshd_config shipped with
- # OpenSSH is to specify options with their default value where
- # possible, but leave them commented. Uncommented options override the
- # default value.
- #Port 22
- #AddressFamily any
- #ListenAddress 0.0.0.0
- #ListenAddress ::
- #HostKey /etc/ssh/ssh_host_rsa_key
- #HostKey /etc/ssh/ssh_host_ecdsa_key
- #HostKey /etc/ssh/ssh_host_ed25519_key
- # Ciphers and keying
- #RekeyLimit default none
- # Logging
- #SyslogFacility AUTH
- #LogLevel INFO
- # Authentication:
- #LoginGraceTime 2m
- #PermitRootLogin prohibit-password
- #StrictModes yes
- #MaxAuthTries 6
- #MaxSessions 10
- """
- )
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write(content)
- ret = self.run_state(
- "file.keyvalue",
- name=name,
- key="permitrootlogin",
- value="no",
- separator=" ",
- uncomment=" #",
- key_ignore_case=True,
- )
- with salt.utils.files.fopen(name, "r") as fp_:
- file_contents = fp_.read()
- self.assertNotIn("#PermitRootLogin", file_contents)
- self.assertNotIn("prohibit-password", file_contents)
- self.assertIn("PermitRootLogin no", file_contents)
- self.assertSaltTrueReturn(ret)
- @pytest.mark.windows_whitelisted
- class BlockreplaceTest(ModuleCase, SaltReturnAssertsMixin):
- marker_start = "# start"
- marker_end = "# end"
- content = dedent(
- """\
- Line 1 of block
- Line 2 of block
- """
- )
- without_block = dedent(
- """\
- Hello world!
- # comment here
- """
- )
- with_non_matching_block = dedent(
- """\
- Hello world!
- # start
- No match here
- # end
- # comment here
- """
- )
- with_non_matching_block_and_marker_end_not_after_newline = dedent(
- """\
- Hello world!
- # start
- No match here# end
- # comment here
- """
- )
- with_matching_block = dedent(
- """\
- Hello world!
- # start
- Line 1 of block
- Line 2 of block
- # end
- # comment here
- """
- )
- with_matching_block_and_extra_newline = dedent(
- """\
- Hello world!
- # start
- Line 1 of block
- Line 2 of block
- # end
- # comment here
- """
- )
- with_matching_block_and_marker_end_not_after_newline = dedent(
- """\
- Hello world!
- # start
- Line 1 of block
- Line 2 of block# end
- # comment here
- """
- )
- content_explicit_posix_newlines = "Line 1 of block\n" "Line 2 of block\n"
- content_explicit_windows_newlines = "Line 1 of block\r\n" "Line 2 of block\r\n"
- without_block_explicit_posix_newlines = "Hello world!\n\n" "# comment here\n"
- without_block_explicit_windows_newlines = (
- "Hello world!\r\n\r\n" "# comment here\r\n"
- )
- with_block_prepended_explicit_posix_newlines = (
- "# start\n"
- "Line 1 of block\n"
- "Line 2 of block\n"
- "# end\n"
- "Hello world!\n\n"
- "# comment here\n"
- )
- with_block_prepended_explicit_windows_newlines = (
- "# start\r\n"
- "Line 1 of block\r\n"
- "Line 2 of block\r\n"
- "# end\r\n"
- "Hello world!\r\n\r\n"
- "# comment here\r\n"
- )
- with_block_appended_explicit_posix_newlines = (
- "Hello world!\n\n"
- "# comment here\n"
- "# start\n"
- "Line 1 of block\n"
- "Line 2 of block\n"
- "# end\n"
- )
- with_block_appended_explicit_windows_newlines = (
- "Hello world!\r\n\r\n"
- "# comment here\r\n"
- "# start\r\n"
- "Line 1 of block\r\n"
- "Line 2 of block\r\n"
- "# end\r\n"
- )
- @staticmethod
- def _write(dest, content):
- with salt.utils.files.fopen(dest, "wb") as fp_:
- fp_.write(salt.utils.stringutils.to_bytes(content))
- @staticmethod
- def _read(src):
- with salt.utils.files.fopen(src, "rb") as fp_:
- return salt.utils.stringutils.to_unicode(fp_.read())
- @with_tempfile()
- def test_prepend(self, name):
- """
- Test blockreplace when prepend_if_not_found=True and block doesn't
- exist in file.
- """
- expected = (
- self.marker_start
- + os.linesep
- + self.content
- + self.marker_end
- + os.linesep
- + self.without_block
- )
- # Pass 1: content ends in newline
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_prepend_append_newline(self, name):
- """
- Test blockreplace when prepend_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to True.
- """
- # Pass 1: content ends in newline
- expected = (
- self.marker_start
- + os.linesep
- + self.content
- + os.linesep
- + self.marker_end
- + os.linesep
- + self.without_block
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = (
- self.marker_start
- + os.linesep
- + self.content
- + self.marker_end
- + os.linesep
- + self.without_block
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_prepend_no_append_newline(self, name):
- """
- Test blockreplace when prepend_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to False.
- """
- # Pass 1: content ends in newline
- expected = (
- self.marker_start
- + os.linesep
- + self.content
- + self.marker_end
- + os.linesep
- + self.without_block
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = (
- self.marker_start
- + os.linesep
- + self.content.rstrip("\r\n")
- + self.marker_end
- + os.linesep
- + self.without_block
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_append(self, name):
- """
- Test blockreplace when append_if_not_found=True and block doesn't
- exist in file.
- """
- expected = (
- self.without_block
- + self.marker_start
- + os.linesep
- + self.content
- + self.marker_end
- + os.linesep
- )
- # Pass 1: content ends in newline
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_append_append_newline(self, name):
- """
- Test blockreplace when append_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to True.
- """
- # Pass 1: content ends in newline
- expected = (
- self.without_block
- + self.marker_start
- + os.linesep
- + self.content
- + os.linesep
- + self.marker_end
- + os.linesep
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = (
- self.without_block
- + self.marker_start
- + os.linesep
- + self.content
- + self.marker_end
- + os.linesep
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_append_no_append_newline(self, name):
- """
- Test blockreplace when append_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to False.
- """
- # Pass 1: content ends in newline
- expected = (
- self.without_block
- + self.marker_start
- + os.linesep
- + self.content
- + self.marker_end
- + os.linesep
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = (
- self.without_block
- + self.marker_start
- + os.linesep
- + self.content.rstrip("\r\n")
- + self.marker_end
- + os.linesep
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_prepend_auto_line_separator(self, name):
- """
- This tests the line separator auto-detection when prepending the block
- """
- # POSIX newlines to Windows newlines
- self._write(name, self.without_block_explicit_windows_newlines)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_prepended_explicit_windows_newlines
- )
- # Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_prepended_explicit_windows_newlines
- )
- # Windows newlines to POSIX newlines
- self._write(name, self.without_block_explicit_posix_newlines)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_prepended_explicit_posix_newlines
- )
- # Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_prepended_explicit_posix_newlines
- )
- @with_tempfile()
- def test_append_auto_line_separator(self, name):
- """
- This tests the line separator auto-detection when appending the block
- """
- # POSIX newlines to Windows newlines
- self._write(name, self.without_block_explicit_windows_newlines)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_appended_explicit_windows_newlines
- )
- # Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_appended_explicit_windows_newlines
- )
- # Windows newlines to POSIX newlines
- self._write(name, self.without_block_explicit_posix_newlines)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_appended_explicit_posix_newlines
- )
- # Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_appended_explicit_posix_newlines
- )
- @with_tempfile()
- def test_non_matching_block(self, name):
- """
- Test blockreplace when block exists but its contents are not a
- match.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_append_newline(self, name):
- """
- Test blockreplace when block exists but its contents are not a
- match. Test with append_newline explicitly set to True.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_no_append_newline(self, name):
- """
- Test blockreplace when block exists but its contents are not a
- match. Test with append_newline explicitly set to False.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- @with_tempfile()
- def test_non_matching_block_and_marker_not_after_newline(self, name):
- """
- Test blockreplace when block exists but its contents are not a
- match, and the marker_end is not directly preceded by a newline.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_and_marker_not_after_newline_append_newline(self, name):
- """
- Test blockreplace when block exists but its contents are not a match,
- and the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to True.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_and_marker_not_after_newline_no_append_newline(
- self, name
- ):
- """
- Test blockreplace when block exists but its contents are not a match,
- and the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to False.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- @with_tempfile()
- def test_matching_block(self, name):
- """
- Test blockreplace when block exists and its contents are a match. No
- changes should be made.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_append_newline(self, name):
- """
- Test blockreplace when block exists and its contents are a match. Test
- with append_newline explicitly set to True. This will result in an
- extra newline when the content ends in a newline, and will not when the
- content does not end in a newline.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_no_append_newline(self, name):
- """
- Test blockreplace when block exists and its contents are a match. Test
- with append_newline explicitly set to False. This will result in the
- marker_end not being directly preceded by a newline when the content
- does not end in a newline.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- @with_tempfile()
- def test_matching_block_and_marker_not_after_newline(self, name):
- """
- Test blockreplace when block exists and its contents are a match, but
- the marker_end is not directly preceded by a newline.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_and_marker_not_after_newline_append_newline(self, name):
- """
- Test blockreplace when block exists and its contents are a match, but
- the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to True. This will result in an extra
- newline when the content ends in a newline, and will not when the
- content does not end in a newline.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_and_marker_not_after_newline_no_append_newline(self, name):
- """
- Test blockreplace when block exists and its contents are a match, but
- the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to False.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- @with_tempfile()
- def test_issue_49043(self, name):
- ret = self.run_function("state.sls", mods="issue-49043", pillar={"name": name},)
- log.error("ret = %s", repr(ret))
- diff = "--- \n+++ \n@@ -0,0 +1,3 @@\n"
- diff += dedent(
- """\
- +#-- start managed zone --
- +äöü
- +#-- end managed zone --
- """
- )
- job = "file_|-somefile-blockreplace_|-{}_|-blockreplace".format(name)
- self.assertEqual(ret[job]["changes"]["diff"], diff)
- @pytest.mark.windows_whitelisted
- class RemoteFileTest(ModuleCase, SaltReturnAssertsMixin):
- """
- Uses a local tornado webserver to test http(s) file.managed states with and
- without skip_verify
- """
- @classmethod
- def setUpClass(cls):
- cls.webserver = Webserver()
- cls.webserver.start()
- cls.source = cls.webserver.url("grail/scene33")
- if IS_WINDOWS:
- # CRLF vs LF causes a different hash on windows
- cls.source_hash = "21438b3d5fd2c0028bcab92f7824dc69"
- else:
- cls.source_hash = "d2feb3beb323c79fc7a0f44f1408b4a3"
- @classmethod
- def tearDownClass(cls):
- cls.webserver.stop()
- @with_tempfile(create=False)
- def setUp(self, name): # pylint: disable=arguments-differ
- self.name = name
- def tearDown(self):
- try:
- os.remove(self.name)
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- raise
- def run_state(self, *args, **kwargs): # pylint: disable=arguments-differ
- ret = super().run_state(*args, **kwargs)
- log.debug("ret = %s", ret)
- return ret
- def test_file_managed_http_source_no_hash(self):
- """
- Test a remote file with no hash
- """
- ret = self.run_state(
- "file.managed", name=self.name, source=self.source, skip_verify=False
- )
- # This should fail because no hash was provided
- self.assertSaltFalseReturn(ret)
- def test_file_managed_http_source(self):
- """
- Test a remote file with no hash
- """
- ret = self.run_state(
- "file.managed",
- name=self.name,
- source=self.source,
- source_hash=self.source_hash,
- skip_verify=False,
- )
- self.assertSaltTrueReturn(ret)
- def test_file_managed_http_source_skip_verify(self):
- """
- Test a remote file using skip_verify
- """
- ret = self.run_state(
- "file.managed", name=self.name, source=self.source, skip_verify=True
- )
- self.assertSaltTrueReturn(ret)
- def test_file_managed_keep_source_false_http(self):
- """
- This test ensures that we properly clean the cached file if keep_source
- is set to False, for source files using an http:// URL
- """
- # Run the state
- ret = self.run_state(
- "file.managed",
- name=self.name,
- source=self.source,
- source_hash=self.source_hash,
- keep_source=False,
- )
- ret = ret[next(iter(ret))]
- assert ret["result"] is True
- # Now make sure that the file is not cached
- result = self.run_function("cp.is_cached", [self.source])
- assert result == "", "File is still cached at {}".format(result)
- @skipIf(not salt.utils.path.which("patch"), "patch is not installed")
- @pytest.mark.windows_whitelisted
- class PatchTest(ModuleCase, SaltReturnAssertsMixin):
- def _check_patch_version(self, min_version):
- """
- patch version check
- """
- version = self.run_function("cmd.run", ["patch --version"]).splitlines()[0]
- version = version.split()[1]
- if _LooseVersion(version) < _LooseVersion(min_version):
- self.skipTest(
- "Minimum patch version required: {}. "
- "Patch version installed: {}".format(min_version, version)
- )
- @classmethod
- def setUpClass(cls):
- cls.webserver = Webserver()
- cls.webserver.start()
- cls.numbers_patch_name = "numbers.patch"
- cls.math_patch_name = "math.patch"
- cls.all_patch_name = "all.patch"
- cls.numbers_patch_template_name = cls.numbers_patch_name + ".jinja"
- cls.math_patch_template_name = cls.math_patch_name + ".jinja"
- cls.all_patch_template_name = cls.all_patch_name + ".jinja"
- cls.numbers_patch_path = "patches/" + cls.numbers_patch_name
- cls.math_patch_path = "patches/" + cls.math_patch_name
- cls.all_patch_path = "patches/" + cls.all_patch_name
- cls.numbers_patch_template_path = "patches/" + cls.numbers_patch_template_name
- cls.math_patch_template_path = "patches/" + cls.math_patch_template_name
- cls.all_patch_template_path = "patches/" + cls.all_patch_template_name
- cls.numbers_patch = "salt://" + cls.numbers_patch_path
- cls.math_patch = "salt://" + cls.math_patch_path
- cls.all_patch = "salt://" + cls.all_patch_path
- cls.numbers_patch_template = "salt://" + cls.numbers_patch_template_path
- cls.math_patch_template = "salt://" + cls.math_patch_template_path
- cls.all_patch_template = "salt://" + cls.all_patch_template_path
- cls.numbers_patch_http = cls.webserver.url(cls.numbers_patch_path)
- cls.math_patch_http = cls.webserver.url(cls.math_patch_path)
- cls.all_patch_http = cls.webserver.url(cls.all_patch_path)
- cls.numbers_patch_template_http = cls.webserver.url(
- cls.numbers_patch_template_path
- )
- cls.math_patch_template_http = cls.webserver.url(cls.math_patch_template_path)
- cls.all_patch_template_http = cls.webserver.url(cls.all_patch_template_path)
- patches_dir = os.path.join(RUNTIME_VARS.FILES, "file", "base", "patches")
- cls.numbers_patch_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.numbers_patch_name)
- )
- cls.math_patch_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.math_patch_name)
- )
- cls.all_patch_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.all_patch_name)
- )
- cls.numbers_patch_template_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.numbers_patch_template_name)
- )
- cls.math_patch_template_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.math_patch_template_name)
- )
- cls.all_patch_template_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.all_patch_template_name)
- )
- cls.context = {"two": "two", "ten": 10}
- @classmethod
- def tearDownClass(cls):
- cls.webserver.stop()
- def setUp(self):
- """
- Create a new unpatched set of files
- """
- self.base_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- os.makedirs(os.path.join(self.base_dir, "foo", "bar"))
- self.numbers_file = os.path.join(self.base_dir, "foo", "numbers.txt")
- self.math_file = os.path.join(self.base_dir, "foo", "bar", "math.txt")
- with salt.utils.files.fopen(self.numbers_file, "w") as fp_:
- fp_.write(
- textwrap.dedent(
- """\
- one
- two
- three
- 1
- 2
- 3
- """
- )
- )
- with salt.utils.files.fopen(self.math_file, "w") as fp_:
- fp_.write(
- textwrap.dedent(
- """\
- Five plus five is ten
- Four squared is sixteen
- """
- )
- )
- self.addCleanup(shutil.rmtree, self.base_dir, ignore_errors=True)
- def test_patch_single_file(self):
- """
- Test file.patch using a patch applied to a single file
- """
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_directory(self):
- """
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files.
- """
- self._check_patch_version("2.6")
- ret = self.run_state(
- "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_strip_parsing(self):
- """
- Test that we successfuly parse -p/--strip when included in the options
- """
- self._check_patch_version("2.6")
- # Run the state using -p1
- ret = self.run_state(
- "file.patch", name=self.base_dir, source=self.all_patch, options="-p1",
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run the state using --strip=1
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch,
- options="--strip=1",
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- # Re-run the state using --strip 1
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch,
- options="--strip 1",
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_saltenv(self):
- """
- Test that we attempt to download the patch from a non-base saltenv
- """
- # This state will fail because we don't have a patch file in that
- # environment, but that is OK, we just want to test that we're looking
- # in an environment other than base.
- ret = self.run_state(
- "file.patch", name=self.math_file, source=self.math_patch, saltenv="prod",
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(
- ret["comment"],
- "Source file {} not found in saltenv 'prod'".format(self.math_patch),
- )
- def test_patch_single_file_failure(self):
- """
- Test file.patch using a patch applied to a single file. This tests a
- failed patch.
- """
- # Empty the file to ensure that the patch doesn't apply cleanly
- with salt.utils.files.fopen(self.numbers_file, "w"):
- pass
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Patch would not apply cleanly", ret["comment"])
- # Test the reject_file option and ensure that the rejects are written
- # to the path specified.
- reject_file = salt.utils.files.mkstemp()
- ret = self.run_state(
- "file.patch",
- name=self.numbers_file,
- source=self.numbers_patch,
- reject_file=reject_file,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Patch would not apply cleanly", ret["comment"])
- self.assertRegex(
- ret["comment"], "saving rejects to (file )?{}".format(reject_file)
- )
- def test_patch_directory_failure(self):
- """
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files.
- """
- # Empty the file to ensure that the patch doesn't apply
- with salt.utils.files.fopen(self.math_file, "w"):
- pass
- ret = self.run_state(
- "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Patch would not apply cleanly", ret["comment"])
- # Test the reject_file option and ensure that the rejects are written
- # to the path specified.
- reject_file = salt.utils.files.mkstemp()
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch,
- reject_file=reject_file,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Patch would not apply cleanly", ret["comment"])
- self.assertRegex(
- ret["comment"], "saving rejects to (file )?{}".format(reject_file)
- )
- def test_patch_single_file_remote_source(self):
- """
- Test file.patch using a patch applied to a single file, with the patch
- coming from a remote source.
- """
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- "file.patch", name=self.math_file, source=self.math_patch_http,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Unable to verify upstream hash", ret["comment"])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- "file.patch",
- name=self.math_file,
- source=self.math_patch_http,
- source_hash=self.math_patch_hash,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- "file.patch",
- name=self.math_file,
- source=self.math_patch_http,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_directory_remote_source(self):
- """
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files, and the patch file coming from a remote
- source.
- """
- self._check_patch_version("2.6")
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- "file.patch", name=self.base_dir, source=self.all_patch_http, strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Unable to verify upstream hash", ret["comment"])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_http,
- source_hash=self.all_patch_hash,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_http,
- strip=1,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_single_file_template(self):
- """
- Test file.patch using a patch applied to a single file, with jinja
- templating applied to the patch file.
- """
- ret = self.run_state(
- "file.patch",
- name=self.numbers_file,
- source=self.numbers_patch_template,
- template="jinja",
- context=self.context,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- "file.patch",
- name=self.numbers_file,
- source=self.numbers_patch_template,
- template="jinja",
- context=self.context,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_directory_template(self):
- """
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files, and with jinja templating applied to the patch
- file.
- """
- self._check_patch_version("2.6")
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_template,
- template="jinja",
- context=self.context,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_template,
- template="jinja",
- context=self.context,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_single_file_remote_source_template(self):
- """
- Test file.patch using a patch applied to a single file, with the patch
- coming from a remote source.
- """
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- "file.patch",
- name=self.math_file,
- source=self.math_patch_template_http,
- template="jinja",
- context=self.context,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Unable to verify upstream hash", ret["comment"])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- "file.patch",
- name=self.math_file,
- source=self.math_patch_template_http,
- source_hash=self.math_patch_template_hash,
- template="jinja",
- context=self.context,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- "file.patch",
- name=self.math_file,
- source=self.math_patch_template_http,
- template="jinja",
- context=self.context,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_directory_remote_source_template(self):
- """
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files, and the patch file coming from a remote
- source.
- """
- self._check_patch_version("2.6")
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_template_http,
- template="jinja",
- context=self.context,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Unable to verify upstream hash", ret["comment"])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_template_http,
- source_hash=self.all_patch_template_hash,
- template="jinja",
- context=self.context,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_template_http,
- template="jinja",
- context=self.context,
- strip=1,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_test_mode(self):
- """
- Test file.patch using test=True
- """
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
- )
- self.assertSaltNoneReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "The patch would be applied")
- self.assertTrue(ret["changes"])
- # Apply the patch for real. We'll then be able to test below that we
- # exit with a True rather than a None result if test=True is used on an
- # already-applied patch.
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- self.assertTrue(ret["changes"])
- # Run again with test=True. Since the pre-check happens before we do
- # the __opts__['test'] check, we should exit with a True result just
- # the same as if we try to run this state on an already-patched file
- # *without* test=True.
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- # Empty the file to ensure that the patch doesn't apply cleanly
- with salt.utils.files.fopen(self.numbers_file, "w"):
- pass
- # Run again with test=True. Similar to the above run, we are testing
- # that we return before we reach the __opts__['test'] check. In this
- # case we should return a False result because we should already know
- # by this point that the patch will not apply cleanly.
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Patch would not apply cleanly", ret["comment"])
- self.assertEqual(ret["changes"], {})
- WIN_TEST_FILE = "c:/testfile"
- @destructiveTest
- @skipIf(not IS_WINDOWS, "windows test only")
- @pytest.mark.windows_whitelisted
- class WinFileTest(ModuleCase):
- """
- Test for the file state on Windows
- """
- def setUp(self):
- self.run_state(
- "file.managed", name=WIN_TEST_FILE, makedirs=True, contents="Only a test"
- )
- def tearDown(self):
- self.run_state("file.absent", name=WIN_TEST_FILE)
- def test_file_managed(self):
- """
- Test file.managed on Windows
- """
- self.assertTrue(self.run_state("file.exists", name=WIN_TEST_FILE))
- def test_file_copy(self):
- """
- Test file.copy on Windows
- """
- ret = self.run_state(
- "file.copy", name="c:/testfile_copy", makedirs=True, source=WIN_TEST_FILE
- )
- self.assertTrue(ret)
- def test_file_comment(self):
- """
- Test file.comment on Windows
- """
- self.run_state("file.comment", name=WIN_TEST_FILE, regex="^Only")
- with salt.utils.files.fopen(WIN_TEST_FILE, "r") as fp_:
- self.assertTrue(fp_.read().startswith("#Only"))
- def test_file_replace(self):
- """
- Test file.replace on Windows
- """
- self.run_state(
- "file.replace", name=WIN_TEST_FILE, pattern="test", repl="testing"
- )
- with salt.utils.files.fopen(WIN_TEST_FILE, "r") as fp_:
- self.assertIn("testing", fp_.read())
- def test_file_absent(self):
- """
- Test file.absent on Windows
- """
- ret = self.run_state("file.absent", name=WIN_TEST_FILE)
- self.assertTrue(ret)
|